1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <stdlib.h>
23 #include <time.h>
24 #ifdef HAVE_SYS_SOCKET_H
25 #include <sys/socket.h>
26 #endif
27 #include <sstream>
28 #include <fstream>
29
30 #include <boost/mpl/list.hpp>
31 #include <boost/shared_array.hpp>
32 #include <boost/random.hpp>
33 #include <boost/type_traits.hpp>
34 #include <boost/test/unit_test.hpp>
35 #include <boost/version.hpp>
36
37 #include <thrift/transport/TBufferTransports.h>
38 #include <thrift/transport/TFDTransport.h>
39 #include <thrift/transport/TFileTransport.h>
40 #include <thrift/transport/TZlibTransport.h>
41 #include <thrift/transport/TSocket.h>
42
43 #include <thrift/concurrency/FunctionRunner.h>
44 #if _WIN32
45 #include <thrift/transport/TPipe.h>
46 #include <thrift/windows/TWinsockSingleton.h>
47 #endif
48
49 using namespace apache::thrift::transport;
50 using namespace apache::thrift;
51
52 static boost::mt19937 rng;
53
initrand(unsigned int seed)54 void initrand(unsigned int seed) {
55 rng.seed(seed);
56 }
57
58 class SizeGenerator {
59 public:
60 virtual ~SizeGenerator() = default;
61 virtual uint32_t nextSize() = 0;
62 virtual std::string describe() const = 0;
63 };
64
65 class ConstantSizeGenerator : public SizeGenerator {
66 public:
ConstantSizeGenerator(uint32_t value)67 ConstantSizeGenerator(uint32_t value) : value_(value) {}
nextSize()68 uint32_t nextSize() override { return value_; }
describe() const69 std::string describe() const override {
70 std::ostringstream desc;
71 desc << value_;
72 return desc.str();
73 }
74
75 private:
76 uint32_t value_;
77 };
78
79 class RandomSizeGenerator : public SizeGenerator {
80 public:
RandomSizeGenerator(uint32_t min,uint32_t max)81 RandomSizeGenerator(uint32_t min, uint32_t max)
82 : generator_(rng, boost::uniform_int<int>(min, max)) {}
83
nextSize()84 uint32_t nextSize() override { return generator_(); }
85
describe() const86 std::string describe() const override {
87 std::ostringstream desc;
88 desc << "rand(" << getMin() << ", " << getMax() << ")";
89 return desc.str();
90 }
91
getMin() const92 uint32_t getMin() const { return (generator_.distribution().min)(); }
getMax() const93 uint32_t getMax() const { return (generator_.distribution().max)(); }
94
95 private:
96 boost::variate_generator<boost::mt19937&, boost::uniform_int<int> > generator_;
97 };
98
99 /**
100 * This class exists solely to make the TEST_RW() macro easier to use.
101 * - it can be constructed implicitly from an integer
102 * - it can contain either a ConstantSizeGenerator or a RandomSizeGenerator
103 * (TEST_RW can't take a SizeGenerator pointer or reference, since it needs
104 * to make a copy of the generator to bind it to the test function.)
105 */
106 class GenericSizeGenerator : public SizeGenerator {
107 public:
GenericSizeGenerator(uint32_t value)108 GenericSizeGenerator(uint32_t value) : generator_(new ConstantSizeGenerator(value)) {}
GenericSizeGenerator(uint32_t min,uint32_t max)109 GenericSizeGenerator(uint32_t min, uint32_t max)
110 : generator_(new RandomSizeGenerator(min, max)) {}
111
nextSize()112 uint32_t nextSize() override { return generator_->nextSize(); }
describe() const113 std::string describe() const override { return generator_->describe(); }
114
115 private:
116 std::shared_ptr<SizeGenerator> generator_;
117 };
118
119 /**************************************************************************
120 * Classes to set up coupled transports
121 **************************************************************************/
122
123 /**
124 * Helper class to represent a coupled pair of transports.
125 *
126 * Data written to the out transport can be read from the in transport.
127 *
128 * This is used as the base class for the various coupled transport
129 * implementations. It shouldn't be instantiated directly.
130 */
131 template <class Transport_>
132 class CoupledTransports {
133 public:
134 virtual ~CoupledTransports() = default;
135 typedef Transport_ TransportType;
136
CoupledTransports()137 CoupledTransports() : in(), out() {}
138
139 std::shared_ptr<Transport_> in;
140 std::shared_ptr<Transport_> out;
141
142 private:
143 CoupledTransports(const CoupledTransports&) = delete;
144 CoupledTransports& operator=(const CoupledTransports&) = delete;
145 };
146
147 /**
148 * Coupled TMemoryBuffers
149 */
150 class CoupledMemoryBuffers : public CoupledTransports<TMemoryBuffer> {
151 public:
CoupledMemoryBuffers()152 CoupledMemoryBuffers() : buf(new TMemoryBuffer) {
153 in = buf;
154 out = buf;
155 }
156
157 std::shared_ptr<TMemoryBuffer> buf;
158 };
159
160 /**
161 * Helper template class for creating coupled transports that wrap
162 * another transport.
163 */
164 template <class WrapperTransport_, class InnerCoupledTransports_>
165 class CoupledWrapperTransportsT : public CoupledTransports<WrapperTransport_> {
166 public:
CoupledWrapperTransportsT()167 CoupledWrapperTransportsT() {
168 if (inner_.in) {
169 this->in.reset(new WrapperTransport_(inner_.in));
170 }
171 if (inner_.out) {
172 this->out.reset(new WrapperTransport_(inner_.out));
173 }
174 }
175
176 InnerCoupledTransports_ inner_;
177 };
178
179 /**
180 * Coupled TBufferedTransports.
181 */
182 template <class InnerTransport_>
183 class CoupledBufferedTransportsT
184 : public CoupledWrapperTransportsT<TBufferedTransport, InnerTransport_> {};
185
186 typedef CoupledBufferedTransportsT<CoupledMemoryBuffers> CoupledBufferedTransports;
187
188 /**
189 * Coupled TFramedTransports.
190 */
191 template <class InnerTransport_>
192 class CoupledFramedTransportsT
193 : public CoupledWrapperTransportsT<TFramedTransport, InnerTransport_> {};
194
195 typedef CoupledFramedTransportsT<CoupledMemoryBuffers> CoupledFramedTransports;
196
197 /**
198 * Coupled TZlibTransports.
199 */
200 template <class InnerTransport_>
201 class CoupledZlibTransportsT : public CoupledWrapperTransportsT<TZlibTransport, InnerTransport_> {};
202
203 typedef CoupledZlibTransportsT<CoupledMemoryBuffers> CoupledZlibTransports;
204
205 #ifndef _WIN32
206 // FD transport doesn't make much sense on Windows.
207 /**
208 * Coupled TFDTransports.
209 */
210 class CoupledFDTransports : public CoupledTransports<TFDTransport> {
211 public:
CoupledFDTransports()212 CoupledFDTransports() {
213 int pipes[2];
214
215 if (pipe(pipes) != 0) {
216 return;
217 }
218
219 in.reset(new TFDTransport(pipes[0], TFDTransport::CLOSE_ON_DESTROY));
220 out.reset(new TFDTransport(pipes[1], TFDTransport::CLOSE_ON_DESTROY));
221 }
222 };
223 #else
224 /**
225 * Coupled pipe transports
226 */
227 class CoupledPipeTransports : public CoupledTransports<TPipe> {
228 public:
229 HANDLE hRead;
230 HANDLE hWrite;
231
CoupledPipeTransports()232 CoupledPipeTransports() {
233 BOOST_REQUIRE(CreatePipe(&hRead, &hWrite, nullptr, 1048576 * 2));
234 in.reset(new TPipe(hRead, hWrite));
235 in->open();
236 out = in;
237 }
238 };
239 #endif
240
241 /**
242 * Coupled TSockets
243 */
244 class CoupledSocketTransports : public CoupledTransports<TSocket> {
245 public:
CoupledSocketTransports()246 CoupledSocketTransports() {
247 THRIFT_SOCKET sockets[2] = {0};
248 if (THRIFT_SOCKETPAIR(PF_UNIX, SOCK_STREAM, 0, sockets) != 0) {
249 return;
250 }
251
252 in.reset(new TSocket(sockets[0]));
253 out.reset(new TSocket(sockets[1]));
254 out->setSendTimeout(100);
255 }
256 };
257
258 // These could be made to work on Windows, but I don't care enough to make it happen
259 #ifndef _WIN32
260 /**
261 * Coupled TFileTransports
262 */
263 class CoupledFileTransports : public CoupledTransports<TFileTransport> {
264 public:
CoupledFileTransports()265 CoupledFileTransports() {
266 #ifndef _WIN32
267 const char* tmp_dir = "/tmp";
268 #define FILENAME_SUFFIX "/thrift.transport_test"
269 #else
270 const char* tmp_dir = getenv("TMP");
271 #define FILENAME_SUFFIX "\\thrift.transport_test"
272 #endif
273
274 // Create a temporary file to use
275 filename.resize(strlen(tmp_dir) + strlen(FILENAME_SUFFIX));
276 THRIFT_SNPRINTF(&filename[0], filename.size(), "%s" FILENAME_SUFFIX, tmp_dir);
277 #undef FILENAME_SUFFIX
278
279 { std::ofstream dummy_creation(filename.c_str(), std::ofstream::trunc); }
280
281 in.reset(new TFileTransport(filename, true));
282 out.reset(new TFileTransport(filename));
283 }
284
~CoupledFileTransports()285 ~CoupledFileTransports() override { remove(filename.c_str()); }
286
287 std::string filename;
288 };
289 #endif
290
291 /**
292 * Wrapper around another CoupledTransports implementation that exposes the
293 * transports as TTransport pointers.
294 *
295 * This is used since accessing a transport via a "TTransport*" exercises a
296 * different code path than using the base pointer class. As part of the
297 * template code changes, most transport methods are no longer virtual.
298 */
299 template <class CoupledTransports_>
300 class CoupledTTransports : public CoupledTransports<TTransport> {
301 public:
CoupledTTransports()302 CoupledTTransports() : transports() {
303 in = transports.in;
304 out = transports.out;
305 }
306
307 CoupledTransports_ transports;
308 };
309
310 /**
311 * Wrapper around another CoupledTransports implementation that exposes the
312 * transports as TBufferBase pointers.
313 *
314 * This can only be instantiated with a transport type that is a subclass of
315 * TBufferBase.
316 */
317 template <class CoupledTransports_>
318 class CoupledBufferBases : public CoupledTransports<TBufferBase> {
319 public:
CoupledBufferBases()320 CoupledBufferBases() : transports() {
321 in = transports.in;
322 out = transports.out;
323 }
324
325 CoupledTransports_ transports;
326 };
327
328 /**************************************************************************
329 * Alarm handling code for use in tests that check the transport blocking
330 * semantics.
331 *
332 * If the transport ends up blocking, we don't want to hang forever. We use
333 * SIGALRM to fire schedule signal to wake up and try to write data so the
334 * transport will unblock.
335 *
336 * It isn't really the safest thing in the world to be mucking around with
337 * complicated global data structures in a signal handler. It should probably
338 * be okay though, since we know the main thread should always be blocked in a
339 * read() request when the signal handler is running.
340 **************************************************************************/
341
342 struct TriggerInfo {
TriggerInfoTriggerInfo343 TriggerInfo(int seconds, const std::shared_ptr<TTransport>& transport, uint32_t writeLength)
344 : timeoutSeconds(seconds), transport(transport), writeLength(writeLength), next(nullptr) {}
345
346 int timeoutSeconds;
347 std::shared_ptr<TTransport> transport;
348 uint32_t writeLength;
349 TriggerInfo* next;
350 };
351
352 apache::thrift::concurrency::Monitor g_alarm_monitor;
353 TriggerInfo* g_triggerInfo;
354 unsigned int g_numTriggersFired;
355 bool g_teardown = false;
356
alarm_handler()357 void alarm_handler() {
358 TriggerInfo* info = nullptr;
359 {
360 apache::thrift::concurrency::Synchronized s(g_alarm_monitor);
361 // The alarm timed out, which almost certainly means we're stuck
362 // on a transport that is incorrectly blocked.
363 ++g_numTriggersFired;
364
365 // Note: we print messages to stdout instead of stderr, since
366 // tools/test/runner only records stdout messages in the failure messages for
367 // boost tests. (boost prints its test info to stdout.)
368 printf("Timeout alarm expired; attempting to unblock transport\n");
369 if (g_triggerInfo == nullptr) {
370 printf(" trigger stack is empty!\n");
371 }
372
373 // Pop off the first TriggerInfo.
374 // If there is another one, schedule an alarm for it.
375 info = g_triggerInfo;
376 g_triggerInfo = info->next;
377 }
378
379 // Write some data to the transport to hopefully unblock it.
380 auto* buf = new uint8_t[info->writeLength];
381 memset(buf, 'b', info->writeLength);
382 std::unique_ptr<uint8_t[]> array(buf);
383 info->transport->write(buf, info->writeLength);
384 info->transport->flush();
385
386 delete info;
387 }
388
alarm_handler_wrapper()389 void alarm_handler_wrapper() {
390 int64_t timeout = 0; // timeout of 0 means wait forever
391 while (true) {
392 bool fireHandler = false;
393 {
394 apache::thrift::concurrency::Synchronized s(g_alarm_monitor);
395 if (g_teardown)
396 return;
397 // calculate timeout
398 if (g_triggerInfo == nullptr) {
399 timeout = 0;
400 } else {
401 timeout = g_triggerInfo->timeoutSeconds * 1000;
402 }
403
404 int waitResult = g_alarm_monitor.waitForTimeRelative(timeout);
405 if (waitResult == THRIFT_ETIMEDOUT)
406 fireHandler = true;
407 }
408 if (fireHandler)
409 alarm_handler(); // calling outside the lock
410 }
411 }
412
413 /**
414 * Add a trigger to be scheduled "seconds" seconds after the
415 * last currently scheduled trigger.
416 *
417 * (Note that this is not "seconds" from now. That might be more logical, but
418 * would require slightly more complicated sorting, rather than just appending
419 * to the end.)
420 */
add_trigger(unsigned int seconds,const std::shared_ptr<TTransport> & transport,uint32_t write_len)421 void add_trigger(unsigned int seconds,
422 const std::shared_ptr<TTransport>& transport,
423 uint32_t write_len) {
424 auto* info = new TriggerInfo(seconds, transport, write_len);
425 {
426 apache::thrift::concurrency::Synchronized s(g_alarm_monitor);
427 if (g_triggerInfo == nullptr) {
428 // This is the first trigger.
429 // Set g_triggerInfo, and schedule the alarm
430 g_triggerInfo = info;
431 g_alarm_monitor.notify();
432 } else {
433 // Add this trigger to the end of the list
434 TriggerInfo* prev = g_triggerInfo;
435 while (prev->next) {
436 prev = prev->next;
437 }
438 prev->next = info;
439 }
440 }
441 }
442
clear_triggers()443 void clear_triggers() {
444 TriggerInfo* info = nullptr;
445
446 {
447 apache::thrift::concurrency::Synchronized s(g_alarm_monitor);
448 info = g_triggerInfo;
449 g_triggerInfo = nullptr;
450 g_numTriggersFired = 0;
451 g_alarm_monitor.notify();
452 }
453
454 while (info != nullptr) {
455 TriggerInfo* next = info->next;
456 delete info;
457 info = next;
458 }
459 }
460
set_trigger(unsigned int seconds,const std::shared_ptr<TTransport> & transport,uint32_t write_len)461 void set_trigger(unsigned int seconds,
462 const std::shared_ptr<TTransport>& transport,
463 uint32_t write_len) {
464 clear_triggers();
465 add_trigger(seconds, transport, write_len);
466 }
467
468 /**************************************************************************
469 * Test functions
470 **************************************************************************/
471
472 /**
473 * Test interleaved write and read calls.
474 *
475 * Generates a buffer totalSize bytes long, then writes it to the transport,
476 * and verifies the written data can be read back correctly.
477 *
478 * Mode of operation:
479 * - call wChunkGenerator to figure out how large of a chunk to write
480 * - call wSizeGenerator to get the size for individual write() calls,
481 * and do this repeatedly until the entire chunk is written.
482 * - call rChunkGenerator to figure out how large of a chunk to read
483 * - call rSizeGenerator to get the size for individual read() calls,
484 * and do this repeatedly until the entire chunk is read.
485 * - repeat until the full buffer is written and read back,
486 * then compare the data read back against the original buffer
487 *
488 *
489 * - If any of the size generators return 0, this means to use the maximum
490 * possible size.
491 *
492 * - If maxOutstanding is non-zero, write chunk sizes will be chosen such that
493 * there are never more than maxOutstanding bytes waiting to be read back.
494 */
495 template <class CoupledTransports>
test_rw(uint32_t totalSize,SizeGenerator & wSizeGenerator,SizeGenerator & rSizeGenerator,SizeGenerator & wChunkGenerator,SizeGenerator & rChunkGenerator,uint32_t maxOutstanding)496 void test_rw(uint32_t totalSize,
497 SizeGenerator& wSizeGenerator,
498 SizeGenerator& rSizeGenerator,
499 SizeGenerator& wChunkGenerator,
500 SizeGenerator& rChunkGenerator,
501 uint32_t maxOutstanding) {
502 CoupledTransports transports;
503 BOOST_REQUIRE(transports.in != nullptr);
504 BOOST_REQUIRE(transports.out != nullptr);
505
506 boost::shared_array<uint8_t> wbuf = boost::shared_array<uint8_t>(new uint8_t[totalSize]);
507 boost::shared_array<uint8_t> rbuf = boost::shared_array<uint8_t>(new uint8_t[totalSize]);
508
509 // store some data in wbuf
510 for (uint32_t n = 0; n < totalSize; ++n) {
511 wbuf[n] = (n & 0xff);
512 }
513 // clear rbuf
514 memset(rbuf.get(), 0, totalSize);
515
516 uint32_t total_written = 0;
517 uint32_t total_read = 0;
518 while (total_read < totalSize) {
519 // Determine how large a chunk of data to write
520 uint32_t wchunk_size = wChunkGenerator.nextSize();
521 if (wchunk_size == 0 || wchunk_size > totalSize - total_written) {
522 wchunk_size = totalSize - total_written;
523 }
524
525 // Make sure (total_written - total_read) + wchunk_size
526 // is less than maxOutstanding
527 if (maxOutstanding > 0 && wchunk_size > maxOutstanding - (total_written - total_read)) {
528 wchunk_size = maxOutstanding - (total_written - total_read);
529 }
530
531 // Write the chunk
532 uint32_t chunk_written = 0;
533 while (chunk_written < wchunk_size) {
534 uint32_t write_size = wSizeGenerator.nextSize();
535 if (write_size == 0 || write_size > wchunk_size - chunk_written) {
536 write_size = wchunk_size - chunk_written;
537 }
538
539 try {
540 transports.out->write(wbuf.get() + total_written, write_size);
541 } catch (TTransportException& te) {
542 if (te.getType() == TTransportException::TIMED_OUT)
543 break;
544 throw te;
545 }
546 chunk_written += write_size;
547 total_written += write_size;
548 }
549
550 // Flush the data, so it will be available in the read transport
551 // Don't flush if wchunk_size is 0. (This should only happen if
552 // total_written == totalSize already, and we're only reading now.)
553 if (wchunk_size > 0) {
554 transports.out->flush();
555 }
556
557 // Determine how large a chunk of data to read back
558 uint32_t rchunk_size = rChunkGenerator.nextSize();
559 if (rchunk_size == 0 || rchunk_size > total_written - total_read) {
560 rchunk_size = total_written - total_read;
561 }
562
563 // Read the chunk
564 uint32_t chunk_read = 0;
565 while (chunk_read < rchunk_size) {
566 uint32_t read_size = rSizeGenerator.nextSize();
567 if (read_size == 0 || read_size > rchunk_size - chunk_read) {
568 read_size = rchunk_size - chunk_read;
569 }
570
571 int bytes_read = -1;
572 try {
573 bytes_read = transports.in->read(rbuf.get() + total_read, read_size);
574 } catch (TTransportException& e) {
575 BOOST_FAIL("read(pos=" << total_read << ", size=" << read_size << ") threw exception \""
576 << e.what() << "\"; written so far: " << total_written << " / "
577 << totalSize << " bytes");
578 }
579
580 BOOST_REQUIRE_MESSAGE(bytes_read > 0,
581 "read(pos=" << total_read << ", size=" << read_size << ") returned "
582 << bytes_read << "; written so far: " << total_written
583 << " / " << totalSize << " bytes");
584 chunk_read += bytes_read;
585 total_read += bytes_read;
586 }
587 }
588
589 // make sure the data read back is identical to the data written
590 BOOST_CHECK_EQUAL(memcmp(rbuf.get(), wbuf.get(), totalSize), 0);
591 }
592
593 template <class CoupledTransports>
test_read_part_available()594 void test_read_part_available() {
595 CoupledTransports transports;
596 BOOST_REQUIRE(transports.in != nullptr);
597 BOOST_REQUIRE(transports.out != nullptr);
598
599 uint8_t write_buf[16];
600 uint8_t read_buf[16];
601 memset(write_buf, 'a', sizeof(write_buf));
602
603 // Attemping to read 10 bytes when only 9 are available should return 9
604 // immediately.
605 transports.out->write(write_buf, 9);
606 transports.out->flush();
607 set_trigger(3, transports.out, 1);
608 uint32_t bytes_read = transports.in->read(read_buf, 10);
609 BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)0);
610 BOOST_CHECK_EQUAL(bytes_read, (uint32_t)9);
611
612 clear_triggers();
613 }
614
615 template <class CoupledTransports>
test_read_part_available_in_chunks()616 void test_read_part_available_in_chunks() {
617 CoupledTransports transports;
618 BOOST_REQUIRE(transports.in != nullptr);
619 BOOST_REQUIRE(transports.out != nullptr);
620
621 uint8_t write_buf[16];
622 uint8_t read_buf[16];
623 memset(write_buf, 'a', sizeof(write_buf));
624
625 // Write 10 bytes (in a single frame, for transports that use framing)
626 transports.out->write(write_buf, 10);
627 transports.out->flush();
628
629 // Read 1 byte, to force the transport to read the frame
630 uint32_t bytes_read = transports.in->read(read_buf, 1);
631 BOOST_CHECK_EQUAL(bytes_read, 1u);
632
633 // Read more than what is remaining and verify the transport does not block
634 set_trigger(3, transports.out, 1);
635 bytes_read = transports.in->read(read_buf, 10);
636 BOOST_CHECK_EQUAL(g_numTriggersFired, 0u);
637 BOOST_CHECK_EQUAL(bytes_read, 9u);
638
639 clear_triggers();
640 }
641
642 template <class CoupledTransports>
test_read_partial_midframe()643 void test_read_partial_midframe() {
644 CoupledTransports transports;
645 BOOST_REQUIRE(transports.in != nullptr);
646 BOOST_REQUIRE(transports.out != nullptr);
647
648 uint8_t write_buf[16];
649 uint8_t read_buf[16];
650 memset(write_buf, 'a', sizeof(write_buf));
651
652 // Attempt to read 10 bytes, when only 9 are available, but after we have
653 // already read part of the data that is available. This exercises a
654 // different code path for several of the transports.
655 //
656 // For transports that add their own framing (e.g., TFramedTransport and
657 // TFileTransport), the two flush calls break up the data in to a 10 byte
658 // frame and a 3 byte frame. The first read then puts us partway through the
659 // first frame, and then we attempt to read past the end of that frame, and
660 // through the next frame, too.
661 //
662 // For buffered transports that perform read-ahead (e.g.,
663 // TBufferedTransport), the read-ahead will most likely see all 13 bytes
664 // written on the first read. The next read will then attempt to read past
665 // the end of the read-ahead buffer.
666 //
667 // Flush 10 bytes, then 3 bytes. This creates 2 separate frames for
668 // transports that track framing internally.
669 transports.out->write(write_buf, 10);
670 transports.out->flush();
671 transports.out->write(write_buf, 3);
672 transports.out->flush();
673
674 // Now read 4 bytes, so that we are partway through the written data.
675 uint32_t bytes_read = transports.in->read(read_buf, 4);
676 BOOST_CHECK_EQUAL(bytes_read, (uint32_t)4);
677
678 // Now attempt to read 10 bytes. Only 9 more are available.
679 //
680 // We should be able to get all 9 bytes, but it might take multiple read
681 // calls, since it is valid for read() to return fewer bytes than requested.
682 // (Most transports do immediately return 9 bytes, but the framing transports
683 // tend to only return to the end of the current frame, which is 6 bytes in
684 // this case.)
685 uint32_t total_read = 0;
686 while (total_read < 9) {
687 set_trigger(3, transports.out, 1);
688 bytes_read = transports.in->read(read_buf, 10);
689 BOOST_REQUIRE_EQUAL(g_numTriggersFired, (unsigned int)0);
690 BOOST_REQUIRE_GT(bytes_read, (uint32_t)0);
691 total_read += bytes_read;
692 BOOST_REQUIRE_LE(total_read, (uint32_t)9);
693 }
694
695 BOOST_CHECK_EQUAL(total_read, (uint32_t)9);
696
697 clear_triggers();
698 }
699
700 template <class CoupledTransports>
test_borrow_part_available()701 void test_borrow_part_available() {
702 CoupledTransports transports;
703 BOOST_REQUIRE(transports.in != nullptr);
704 BOOST_REQUIRE(transports.out != nullptr);
705
706 uint8_t write_buf[16];
707 uint8_t read_buf[16];
708 memset(write_buf, 'a', sizeof(write_buf));
709
710 // Attemping to borrow 10 bytes when only 9 are available should return nullptr
711 // immediately.
712 transports.out->write(write_buf, 9);
713 transports.out->flush();
714 set_trigger(3, transports.out, 1);
715 uint32_t borrow_len = 10;
716 const uint8_t* borrowed_buf = transports.in->borrow(read_buf, &borrow_len);
717 BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)0);
718 BOOST_CHECK(borrowed_buf == nullptr);
719
720 clear_triggers();
721 }
722
723 template <class CoupledTransports>
test_read_none_available()724 void test_read_none_available() {
725 CoupledTransports transports;
726 BOOST_REQUIRE(transports.in != nullptr);
727 BOOST_REQUIRE(transports.out != nullptr);
728
729 uint8_t read_buf[16];
730
731 // Attempting to read when no data is available should either block until
732 // some data is available, or fail immediately. (e.g., TSocket blocks,
733 // TMemoryBuffer just fails.)
734 //
735 // If the transport blocks, it should succeed once some data is available,
736 // even if less than the amount requested becomes available.
737 set_trigger(1, transports.out, 2);
738 add_trigger(1, transports.out, 8);
739 uint32_t bytes_read = transports.in->read(read_buf, 10);
740 if (bytes_read == 0) {
741 BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)0);
742 clear_triggers();
743 } else {
744 BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)1);
745 BOOST_CHECK_EQUAL(bytes_read, (uint32_t)2);
746 }
747
748 clear_triggers();
749 }
750
751 template <class CoupledTransports>
test_borrow_none_available()752 void test_borrow_none_available() {
753 CoupledTransports transports;
754 BOOST_REQUIRE(transports.in != nullptr);
755 BOOST_REQUIRE(transports.out != nullptr);
756
757 uint8_t write_buf[16];
758 memset(write_buf, 'a', sizeof(write_buf));
759
760 // Attempting to borrow when no data is available should fail immediately
761 set_trigger(1, transports.out, 10);
762 uint32_t borrow_len = 10;
763 const uint8_t* borrowed_buf = transports.in->borrow(nullptr, &borrow_len);
764 BOOST_CHECK(borrowed_buf == nullptr);
765 BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)0);
766
767 clear_triggers();
768 }
769
770 /**************************************************************************
771 * Test case generation
772 *
773 * Pretty ugly and annoying. This would be much easier if we the unit test
774 * framework didn't force each test to be a separate function.
775 * - Writing a completely separate function definition for each of these would
776 * result in a lot of repetitive boilerplate code.
777 * - Combining many tests into a single function makes it more difficult to
778 * tell precisely which tests failed. It also means you can't get a progress
779 * update after each test, and the tests are already fairly slow.
780 * - Similar registration could be achieved with BOOST_TEST_CASE_TEMPLATE,
781 * but it requires a lot of awkward MPL code, and results in useless test
782 * case names. (The names are generated from std::type_info::name(), which
783 * is compiler-dependent. gcc returns mangled names.)
784 **************************************************************************/
785
786 #define ADD_TEST_RW(CoupledTransports, totalSize, ...) \
787 addTestRW<CoupledTransports>(BOOST_STRINGIZE(CoupledTransports), totalSize, ##__VA_ARGS__);
788
789 #define TEST_RW(CoupledTransports, totalSize, ...) \
790 do { \
791 /* Add the test as specified, to test the non-virtual function calls */ \
792 ADD_TEST_RW(CoupledTransports, totalSize, ##__VA_ARGS__); \
793 /* \
794 * Also test using the transport as a TTransport*, to test \
795 * the read_virt()/write_virt() calls \
796 */ \
797 ADD_TEST_RW(CoupledTTransports<CoupledTransports>, totalSize, ##__VA_ARGS__); \
798 /* Test wrapping the transport with TBufferedTransport */ \
799 ADD_TEST_RW(CoupledBufferedTransportsT<CoupledTransports>, totalSize, ##__VA_ARGS__); \
800 /* Test wrapping the transport with TFramedTransports */ \
801 ADD_TEST_RW(CoupledFramedTransportsT<CoupledTransports>, totalSize, ##__VA_ARGS__); \
802 /* Test wrapping the transport with TZlibTransport */ \
803 ADD_TEST_RW(CoupledZlibTransportsT<CoupledTransports>, totalSize, ##__VA_ARGS__); \
804 } while (0)
805
806 #define ADD_TEST_BLOCKING(CoupledTransports) \
807 addTestBlocking<CoupledTransports>(BOOST_STRINGIZE(CoupledTransports));
808
809 #define TEST_BLOCKING_BEHAVIOR(CoupledTransports) \
810 ADD_TEST_BLOCKING(CoupledTransports); \
811 ADD_TEST_BLOCKING(CoupledTTransports<CoupledTransports>); \
812 ADD_TEST_BLOCKING(CoupledBufferedTransportsT<CoupledTransports>); \
813 ADD_TEST_BLOCKING(CoupledFramedTransportsT<CoupledTransports>); \
814 ADD_TEST_BLOCKING(CoupledZlibTransportsT<CoupledTransports>);
815
816 class TransportTestGen {
817 public:
TransportTestGen(boost::unit_test::test_suite * suite,float sizeMultiplier)818 TransportTestGen(boost::unit_test::test_suite* suite, float sizeMultiplier)
819 : suite_(suite), sizeMultiplier_(sizeMultiplier) {}
820
generate()821 void generate() {
822 GenericSizeGenerator rand4k(1, 4096);
823
824 /*
825 * We do the basically the same set of tests for each transport type,
826 * although we tweak the parameters in some places.
827 */
828
829 // TMemoryBuffer tests
830 TEST_RW(CoupledMemoryBuffers, 1024 * 1024, 0, 0);
831 TEST_RW(CoupledMemoryBuffers, 1024 * 256, rand4k, rand4k);
832 TEST_RW(CoupledMemoryBuffers, 1024 * 256, 167, 163);
833 TEST_RW(CoupledMemoryBuffers, 1024 * 16, 1, 1);
834
835 TEST_RW(CoupledMemoryBuffers, 1024 * 256, 0, 0, rand4k, rand4k);
836 TEST_RW(CoupledMemoryBuffers, 1024 * 256, rand4k, rand4k, rand4k, rand4k);
837 TEST_RW(CoupledMemoryBuffers, 1024 * 256, 167, 163, rand4k, rand4k);
838 TEST_RW(CoupledMemoryBuffers, 1024 * 16, 1, 1, rand4k, rand4k);
839
840 TEST_BLOCKING_BEHAVIOR(CoupledMemoryBuffers);
841
842 #ifndef _WIN32
843 // TFDTransport tests
844 // Since CoupledFDTransports tests with a pipe, writes will block
845 // if there is too much outstanding unread data in the pipe.
846 uint32_t fd_max_outstanding = 4096;
847 TEST_RW(CoupledFDTransports, 1024 * 1024, 0, 0, 0, 0, fd_max_outstanding);
848 TEST_RW(CoupledFDTransports, 1024 * 256, rand4k, rand4k, 0, 0, fd_max_outstanding);
849 TEST_RW(CoupledFDTransports, 1024 * 256, 167, 163, 0, 0, fd_max_outstanding);
850 TEST_RW(CoupledFDTransports, 1024 * 16, 1, 1, 0, 0, fd_max_outstanding);
851
852 TEST_RW(CoupledFDTransports, 1024 * 256, 0, 0, rand4k, rand4k, fd_max_outstanding);
853 TEST_RW(CoupledFDTransports, 1024 * 256, rand4k, rand4k, rand4k, rand4k, fd_max_outstanding);
854 TEST_RW(CoupledFDTransports, 1024 * 256, 167, 163, rand4k, rand4k, fd_max_outstanding);
855 TEST_RW(CoupledFDTransports, 1024 * 16, 1, 1, rand4k, rand4k, fd_max_outstanding);
856
857 TEST_BLOCKING_BEHAVIOR(CoupledFDTransports);
858 #else
859 // TPipe tests (WIN32 only)
860 TEST_RW(CoupledPipeTransports, 1024 * 1024, 0, 0);
861 TEST_RW(CoupledPipeTransports, 1024 * 256, rand4k, rand4k);
862 TEST_RW(CoupledPipeTransports, 1024 * 256, 167, 163);
863 TEST_RW(CoupledPipeTransports, 1024 * 16, 1, 1);
864
865 TEST_RW(CoupledPipeTransports, 1024 * 256, 0, 0, rand4k, rand4k);
866 TEST_RW(CoupledPipeTransports, 1024 * 256, rand4k, rand4k, rand4k, rand4k);
867 TEST_RW(CoupledPipeTransports, 1024 * 256, 167, 163, rand4k, rand4k);
868 TEST_RW(CoupledPipeTransports, 1024 * 16, 1, 1, rand4k, rand4k);
869
870 TEST_BLOCKING_BEHAVIOR(CoupledPipeTransports);
871 #endif //_WIN32
872
873 // TSocket tests
874 uint32_t socket_max_outstanding = 4096;
875 TEST_RW(CoupledSocketTransports, 1024 * 1024, 0, 0, 0, 0, socket_max_outstanding);
876 TEST_RW(CoupledSocketTransports, 1024 * 256, rand4k, rand4k, 0, 0, socket_max_outstanding);
877 TEST_RW(CoupledSocketTransports, 1024 * 256, 167, 163, 0, 0, socket_max_outstanding);
878 // Doh. Apparently writing to a socket has some additional overhead for
879 // each send() call. If we have more than ~400 outstanding 1-byte write
880 // requests, additional send() calls start blocking.
881 TEST_RW(CoupledSocketTransports, 1024 * 16, 1, 1, 0, 0, socket_max_outstanding);
882 TEST_RW(CoupledSocketTransports, 1024 * 256, 0, 0, rand4k, rand4k, socket_max_outstanding);
883 TEST_RW(CoupledSocketTransports,
884 1024 * 256,
885 rand4k,
886 rand4k,
887 rand4k,
888 rand4k,
889 socket_max_outstanding);
890 TEST_RW(CoupledSocketTransports, 1024 * 256, 167, 163, rand4k, rand4k, socket_max_outstanding);
891 TEST_RW(CoupledSocketTransports, 1024 * 16, 1, 1, rand4k, rand4k, socket_max_outstanding);
892
893 TEST_BLOCKING_BEHAVIOR(CoupledSocketTransports);
894
895 // These could be made to work on Windows, but I don't care enough to make it happen
896 #ifndef _WIN32
897 // TFileTransport tests
898 // We use smaller buffer sizes here, since TFileTransport is fairly slow.
899 //
900 // TFileTransport can't write more than 16MB at once
901 uint32_t max_write_at_once = 1024 * 1024 * 16 - 4;
902 TEST_RW(CoupledFileTransports, 1024 * 1024, max_write_at_once, 0);
903 TEST_RW(CoupledFileTransports, 1024 * 128, rand4k, rand4k);
904 TEST_RW(CoupledFileTransports, 1024 * 128, 167, 163);
905 TEST_RW(CoupledFileTransports, 1024 * 2, 1, 1);
906
907 TEST_RW(CoupledFileTransports, 1024 * 64, 0, 0, rand4k, rand4k);
908 TEST_RW(CoupledFileTransports, 1024 * 64, rand4k, rand4k, rand4k, rand4k);
909 TEST_RW(CoupledFileTransports, 1024 * 64, 167, 163, rand4k, rand4k);
910 TEST_RW(CoupledFileTransports, 1024 * 2, 1, 1, rand4k, rand4k);
911
912 TEST_BLOCKING_BEHAVIOR(CoupledFileTransports);
913 #endif
914
915 // Add some tests that access TBufferedTransport and TFramedTransport
916 // via TTransport pointers and TBufferBase pointers.
917 ADD_TEST_RW(CoupledTTransports<CoupledBufferedTransports>,
918 1024 * 1024,
919 rand4k,
920 rand4k,
921 rand4k,
922 rand4k);
923 ADD_TEST_RW(CoupledBufferBases<CoupledBufferedTransports>,
924 1024 * 1024,
925 rand4k,
926 rand4k,
927 rand4k,
928 rand4k);
929 ADD_TEST_RW(CoupledTTransports<CoupledFramedTransports>,
930 1024 * 1024,
931 rand4k,
932 rand4k,
933 rand4k,
934 rand4k);
935 ADD_TEST_RW(CoupledBufferBases<CoupledFramedTransports>,
936 1024 * 1024,
937 rand4k,
938 rand4k,
939 rand4k,
940 rand4k);
941
942 // Test using TZlibTransport via a TTransport pointer
943 ADD_TEST_RW(CoupledTTransports<CoupledZlibTransports>,
944 1024 * 1024,
945 rand4k,
946 rand4k,
947 rand4k,
948 rand4k);
949 }
950
951 #if (BOOST_VERSION >= 105900)
952 #define MAKE_TEST_CASE(_FUNC, _NAME) boost::unit_test::make_test_case(_FUNC, _NAME, __FILE__, __LINE__)
953 #else
954 #define MAKE_TEST_CASE(_FUNC, _NAME) boost::unit_test::make_test_case(_FUNC, _NAME)
955 #endif
956
957 private:
958 template <class CoupledTransports>
addTestRW(const char * transport_name,uint32_t totalSize,GenericSizeGenerator wSizeGen,GenericSizeGenerator rSizeGen,GenericSizeGenerator wChunkSizeGen=0,GenericSizeGenerator rChunkSizeGen=0,uint32_t maxOutstanding=0,uint32_t expectedFailures=0)959 void addTestRW(const char* transport_name,
960 uint32_t totalSize,
961 GenericSizeGenerator wSizeGen,
962 GenericSizeGenerator rSizeGen,
963 GenericSizeGenerator wChunkSizeGen = 0,
964 GenericSizeGenerator rChunkSizeGen = 0,
965 uint32_t maxOutstanding = 0,
966 uint32_t expectedFailures = 0) {
967 // adjust totalSize by the specified sizeMultiplier_ first
968 totalSize = static_cast<uint32_t>(totalSize * sizeMultiplier_);
969
970 std::ostringstream name;
971 name << transport_name << "::test_rw(" << totalSize << ", " << wSizeGen.describe() << ", "
972 << rSizeGen.describe() << ", " << wChunkSizeGen.describe() << ", "
973 << rChunkSizeGen.describe() << ", " << maxOutstanding << ")";
974
975 #if (BOOST_VERSION >= 105900)
976 std::function<void ()> test_func
977 #else
978 boost::unit_test::callback0<> test_func
979 #endif
980 = std::bind(test_rw<CoupledTransports>,
981 totalSize,
982 wSizeGen,
983 rSizeGen,
984 wChunkSizeGen,
985 rChunkSizeGen,
986 maxOutstanding);
987 suite_->add(MAKE_TEST_CASE(test_func, name.str()), expectedFailures);
988 }
989
990 template <class CoupledTransports>
addTestBlocking(const char * transportName,uint32_t expectedFailures=0)991 void addTestBlocking(const char* transportName, uint32_t expectedFailures = 0) {
992 char name[1024];
993
994 THRIFT_SNPRINTF(name, sizeof(name), "%s::test_read_part_available()", transportName);
995 suite_->add(MAKE_TEST_CASE(test_read_part_available<CoupledTransports>, name), expectedFailures);
996
997 THRIFT_SNPRINTF(name, sizeof(name), "%s::test_read_part_available_in_chunks()", transportName);
998 suite_->add(MAKE_TEST_CASE(test_read_part_available_in_chunks<CoupledTransports>, name), expectedFailures);
999
1000 THRIFT_SNPRINTF(name, sizeof(name), "%s::test_read_partial_midframe()", transportName);
1001 suite_->add(MAKE_TEST_CASE(test_read_partial_midframe<CoupledTransports>, name), expectedFailures);
1002
1003 THRIFT_SNPRINTF(name, sizeof(name), "%s::test_read_none_available()", transportName);
1004 suite_->add(MAKE_TEST_CASE(test_read_none_available<CoupledTransports>, name), expectedFailures);
1005
1006 THRIFT_SNPRINTF(name, sizeof(name), "%s::test_borrow_part_available()", transportName);
1007 suite_->add(MAKE_TEST_CASE(test_borrow_part_available<CoupledTransports>, name), expectedFailures);
1008
1009 THRIFT_SNPRINTF(name, sizeof(name), "%s::test_borrow_none_available()", transportName);
1010 suite_->add(MAKE_TEST_CASE(test_borrow_none_available<CoupledTransports>, name), expectedFailures);
1011 }
1012
1013 boost::unit_test::test_suite* suite_;
1014 // sizeMultiplier_ is configurable via the command line, and allows the
1015 // user to adjust between smaller buffers that can be tested quickly,
1016 // or larger buffers that more thoroughly exercise the code, but take
1017 // longer.
1018 float sizeMultiplier_;
1019 };
1020
1021 /**************************************************************************
1022 * General Initialization
1023 **************************************************************************/
1024
1025 struct global_fixture {
1026 std::shared_ptr<apache::thrift::concurrency::Thread> alarmThread_;
global_fixtureglobal_fixture1027 global_fixture() {
1028 #if _WIN32
1029 apache::thrift::transport::TWinsockSingleton::create();
1030 #endif
1031
1032 apache::thrift::concurrency::ThreadFactory factory;
1033 factory.setDetached(false);
1034
1035 alarmThread_ = factory.newThread(
1036 apache::thrift::concurrency::FunctionRunner::create(alarm_handler_wrapper));
1037 alarmThread_->start();
1038 }
~global_fixtureglobal_fixture1039 ~global_fixture() {
1040 {
1041 apache::thrift::concurrency::Synchronized s(g_alarm_monitor);
1042 g_teardown = true;
1043 g_alarm_monitor.notify();
1044 }
1045 alarmThread_->join();
1046 }
1047 };
1048
1049 #if (BOOST_VERSION >= 105900)
1050 BOOST_GLOBAL_FIXTURE(global_fixture);
1051 #else
BOOST_GLOBAL_FIXTURE(global_fixture)1052 BOOST_GLOBAL_FIXTURE(global_fixture)
1053 #endif
1054
1055 #ifdef BOOST_TEST_DYN_LINK
1056 bool init_unit_test_suite() {
1057 struct timeval tv;
1058 THRIFT_GETTIMEOFDAY(&tv, nullptr);
1059 int seed = tv.tv_sec ^ tv.tv_usec;
1060
1061 initrand(seed);
1062
1063 boost::unit_test::test_suite* suite = &boost::unit_test::framework::master_test_suite();
1064 suite->p_name.value = "TransportTest";
1065 TransportTestGen transport_test_generator(suite, 1);
1066 transport_test_generator.generate();
1067 return true;
1068 }
1069
main(int argc,char * argv[])1070 int main( int argc, char* argv[] ) {
1071 return ::boost::unit_test::unit_test_main(&init_unit_test_suite,argc,argv);
1072 }
1073 #else
1074 boost::unit_test::test_suite* init_unit_test_suite(int argc, char* argv[]) {
1075 THRIFT_UNUSED_VARIABLE(argc);
1076 THRIFT_UNUSED_VARIABLE(argv);
1077 struct timeval tv;
1078 THRIFT_GETTIMEOFDAY(&tv, nullptr);
1079 int seed = tv.tv_sec ^ tv.tv_usec;
1080
1081 initrand(seed);
1082
1083 boost::unit_test::test_suite* suite = &boost::unit_test::framework::master_test_suite();
1084 suite->p_name.value = "TransportTest";
1085 TransportTestGen transport_test_generator(suite, 1);
1086 transport_test_generator.generate();
1087 return nullptr;
1088 }
1089 #endif
1090