1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 /**
21 * Exercises various transports, combined with the buffered/framed wrappers.
22 *
23 * Originally ported from the C++ version, with Windows support code added.
24 */
25 module transport_test;
26
27 import core.atomic;
28 import core.time : Duration;
29 import core.thread : Thread;
30 import std.conv : to;
31 import std.datetime;
32 import std.exception : enforce;
33 static import std.file;
34 import std.getopt;
35 import std.random : rndGen, uniform, unpredictableSeed;
36 import std.socket;
37 import std.stdio;
38 import std.string;
39 import std.typetuple;
40 import thrift.transport.base;
41 import thrift.transport.buffered;
42 import thrift.transport.framed;
43 import thrift.transport.file;
44 import thrift.transport.http;
45 import thrift.transport.memory;
46 import thrift.transport.socket;
47 import thrift.transport.zlib;
48
49 /*
50 * Size generation helpers – used to be able to run the same testing code
51 * with both constant and random total/chunk sizes.
52 */
53
54 interface SizeGenerator {
55 size_t nextSize();
56 string toString();
57 }
58
59 class ConstantSizeGenerator : SizeGenerator {
this(size_t value)60 this(size_t value) {
61 value_ = value;
62 }
63
nextSize()64 override size_t nextSize() {
65 return value_;
66 }
67
toString()68 override string toString() const {
69 return to!string(value_);
70 }
71
72 private:
73 size_t value_;
74 }
75
76 class RandomSizeGenerator : SizeGenerator {
this(size_t min,size_t max)77 this(size_t min, size_t max) {
78 min_ = min;
79 max_ = max;
80 }
81
nextSize()82 override size_t nextSize() {
83 return uniform!"[]"(min_, max_);
84 }
85
toString()86 override string toString() const {
87 return format("rand(%s, %s)", min_, max_);
88 }
89
min()90 size_t min() const @property {
91 return min_;
92 }
93
max()94 size_t max() const @property {
95 return max_;
96 }
97
98 private:
99 size_t min_;
100 size_t max_;
101 }
102
103
104 /*
105 * Classes to set up coupled transports
106 */
107
108 /**
109 * Helper class to represent a coupled pair of transports.
110 *
111 * Data written to the output transport can be read from the input transport.
112 *
113 * This is used as the base class for the various coupled transport
114 * implementations. It shouldn't be used directly.
115 */
116 class CoupledTransports(Transport) if (isTTransport!Transport) {
117 Transport input;
118 Transport output;
119 }
120
isCoupledTransports(T)121 template isCoupledTransports(T) {
122 static if (is(T _ : CoupledTransports!U, U)) {
123 enum isCoupledTransports = true;
124 } else {
125 enum isCoupledTransports = false;
126 }
127 }
128
129 /**
130 * Helper template class for creating coupled transports that wrap
131 * another transport.
132 */
133 class CoupledWrapperTransports(WrapperTransport, InnerCoupledTransports) if (
134 isTTransport!WrapperTransport && isCoupledTransports!InnerCoupledTransports
135 ) : CoupledTransports!WrapperTransport {
this()136 this() {
137 inner_ = new InnerCoupledTransports();
138 if (inner_.input) {
139 input = new WrapperTransport(inner_.input);
140 }
141 if (inner_.output) {
142 output = new WrapperTransport(inner_.output);
143 }
144 }
145
~this()146 ~this() {
147 destroy(inner_);
148 }
149
150 private:
151 InnerCoupledTransports inner_;
152 }
153
154 import thrift.internal.codegen : PApply;
155 alias PApply!(CoupledWrapperTransports, TBufferedTransport) CoupledBufferedTransports;
156 alias PApply!(CoupledWrapperTransports, TFramedTransport) CoupledFramedTransports;
157 alias PApply!(CoupledWrapperTransports, TZlibTransport) CoupledZlibTransports;
158
159 /**
160 * Coupled TMemoryBuffers.
161 */
162 class CoupledMemoryBuffers : CoupledTransports!TMemoryBuffer {
this()163 this() {
164 buf = new TMemoryBuffer;
165 input = buf;
166 output = buf;
167 }
168
169 TMemoryBuffer buf;
170 }
171
172 /**
173 * Coupled TSockets.
174 */
175 class CoupledSocketTransports : CoupledTransports!TSocket {
this()176 this() {
177 auto sockets = socketPair();
178 input = new TSocket(sockets[0]);
179 output = new TSocket(sockets[1]);
180 }
181
~this()182 ~this() {
183 input.close();
184 output.close();
185 }
186 }
187
188 /**
189 * Coupled TFileTransports
190 */
191 class CoupledFileTransports : CoupledTransports!TTransport {
this()192 this() {
193 // We actually need the file name of the temp file here, so we can't just
194 // use the usual tempfile facilities.
195 do {
196 fileName_ = tmpDir ~ "/thrift.transport_test." ~ to!string(rndGen().front);
197 rndGen().popFront();
198 } while (std.file.exists(fileName_));
199
200 writefln("Using temp file: %s", fileName_);
201
202 auto writer = new TFileWriterTransport(fileName_);
203 writer.open();
204 output = writer;
205
206 // Wait until the file has been created.
207 writer.flush();
208
209 auto reader = new TFileReaderTransport(fileName_);
210 reader.open();
211 reader.readTimeout(dur!"msecs"(-1));
212 input = reader;
213 }
214
~this()215 ~this() {
216 input.close();
217 output.close();
218 std.file.remove(fileName_);
219 }
220
221 static string tmpDir;
222
223 private:
224 string fileName_;
225 }
226
227
228 /*
229 * Test functions
230 */
231
232 /**
233 * Test interleaved write and read calls.
234 *
235 * Generates a buffer totalSize bytes long, then writes it to the transport,
236 * and verifies the written data can be read back correctly.
237 *
238 * Mode of operation:
239 * - call wChunkGenerator to figure out how large of a chunk to write
240 * - call wSizeGenerator to get the size for individual write() calls,
241 * and do this repeatedly until the entire chunk is written.
242 * - call rChunkGenerator to figure out how large of a chunk to read
243 * - call rSizeGenerator to get the size for individual read() calls,
244 * and do this repeatedly until the entire chunk is read.
245 * - repeat until the full buffer is written and read back,
246 * then compare the data read back against the original buffer
247 *
248 *
249 * - If any of the size generators return 0, this means to use the maximum
250 * possible size.
251 *
252 * - If maxOutstanding is non-zero, write chunk sizes will be chosen such that
253 * there are never more than maxOutstanding bytes waiting to be read back.
254 */
255 void testReadWrite(CoupledTransports)(
256 size_t totalSize,
257 SizeGenerator wSizeGenerator,
258 SizeGenerator rSizeGenerator,
259 SizeGenerator wChunkGenerator,
260 SizeGenerator rChunkGenerator,
261 size_t maxOutstanding
262 ) if (
263 isCoupledTransports!CoupledTransports
264 ) {
265 scope transports = new CoupledTransports;
266 assert(transports.input);
267 assert(transports.output);
268
269 auto wbuf = new ubyte[totalSize];
270 auto rbuf = new ubyte[totalSize];
271
272 // Store some data in wbuf.
foreach(i,ref b;wbuf)273 foreach (i, ref b; wbuf) {
274 b = i & 0xff;
275 }
276
277 size_t totalWritten;
278 size_t totalRead;
279 while (totalRead < totalSize) {
280 // Determine how large a chunk of data to write.
281 auto wChunkSize = wChunkGenerator.nextSize();
282 if (wChunkSize == 0 || wChunkSize > totalSize - totalWritten) {
283 wChunkSize = totalSize - totalWritten;
284 }
285
286 // Make sure (totalWritten - totalRead) + wChunkSize is less than
287 // maxOutstanding.
288 if (maxOutstanding > 0 &&
289 wChunkSize > maxOutstanding - (totalWritten - totalRead)) {
290 wChunkSize = maxOutstanding - (totalWritten - totalRead);
291 }
292
293 // Write the chunk.
294 size_t chunkWritten = 0;
295 while (chunkWritten < wChunkSize) {
296 auto writeSize = wSizeGenerator.nextSize();
297 if (writeSize == 0 || writeSize > wChunkSize - chunkWritten) {
298 writeSize = wChunkSize - chunkWritten;
299 }
300
301 transports.output.write(wbuf[totalWritten .. totalWritten + writeSize]);
302 chunkWritten += writeSize;
303 totalWritten += writeSize;
304 }
305
306 // Flush the data, so it will be available in the read transport
307 // Don't flush if wChunkSize is 0. (This should only happen if
308 // totalWritten == totalSize already, and we're only reading now.)
309 if (wChunkSize > 0) {
310 transports.output.flush();
311 }
312
313 // Determine how large a chunk of data to read back.
314 auto rChunkSize = rChunkGenerator.nextSize();
315 if (rChunkSize == 0 || rChunkSize > totalWritten - totalRead) {
316 rChunkSize = totalWritten - totalRead;
317 }
318
319 // Read the chunk.
320 size_t chunkRead;
321 while (chunkRead < rChunkSize) {
322 auto readSize = rSizeGenerator.nextSize();
323 if (readSize == 0 || readSize > rChunkSize - chunkRead) {
324 readSize = rChunkSize - chunkRead;
325 }
326
327 size_t bytesRead;
328 try {
329 bytesRead = transports.input.read(
330 rbuf[totalRead .. totalRead + readSize]);
catch(TTransportException e)331 } catch (TTransportException e) {
332 throw new Exception(format(`read(pos = %s, size = %s) threw ` ~
333 `exception "%s"; written so far: %s/%s bytes`, totalRead, readSize,
334 e.msg, totalWritten, totalSize));
335 }
336
337 enforce(bytesRead > 0, format(`read(pos = %s, size = %s) returned %s; ` ~
338 `written so far: %s/%s bytes`, totalRead, readSize, bytesRead,
339 totalWritten, totalSize));
340
341 chunkRead += bytesRead;
342 totalRead += bytesRead;
343 }
344 }
345
346 // make sure the data read back is identical to the data written
347 if (rbuf != wbuf) {
348 stderr.writefln("%s vs. %s", wbuf[$ - 4 .. $], rbuf[$ - 4 .. $]);
349 stderr.writefln("rbuf: %s vs. wbuf: %s", rbuf.length, wbuf.length);
350 }
351 enforce(rbuf == wbuf);
352 }
353
354 void testReadPartAvailable(CoupledTransports)() if (
355 isCoupledTransports!CoupledTransports
356 ) {
357 scope transports = new CoupledTransports;
358 assert(transports.input);
359 assert(transports.output);
360
361 ubyte[10] writeBuf = 'a';
362 ubyte[10] readBuf;
363
364 // Attemping to read 10 bytes when only 9 are available should return 9
365 // immediately.
366 transports.output.write(writeBuf[0 .. 9]);
367 transports.output.flush();
368
369 auto t = Trigger(dur!"seconds"(3), transports.output, 1);
370 auto bytesRead = transports.input.read(readBuf);
371 enforce(t.fired == 0);
372 enforce(bytesRead == 9);
373 }
374
375 void testReadPartialMidframe(CoupledTransports)() if (
376 isCoupledTransports!CoupledTransports
377 ) {
378 scope transports = new CoupledTransports;
379 assert(transports.input);
380 assert(transports.output);
381
382 ubyte[13] writeBuf = 'a';
383 ubyte[14] readBuf;
384
385 // Attempt to read 10 bytes, when only 9 are available, but after we have
386 // already read part of the data that is available. This exercises a
387 // different code path for several of the transports.
388 //
389 // For transports that add their own framing (e.g., TFramedTransport and
390 // TFileTransport), the two flush calls break up the data in to a 10 byte
391 // frame and a 3 byte frame. The first read then puts us partway through the
392 // first frame, and then we attempt to read past the end of that frame, and
393 // through the next frame, too.
394 //
395 // For buffered transports that perform read-ahead (e.g.,
396 // TBufferedTransport), the read-ahead will most likely see all 13 bytes
397 // written on the first read. The next read will then attempt to read past
398 // the end of the read-ahead buffer.
399 //
400 // Flush 10 bytes, then 3 bytes. This creates 2 separate frames for
401 // transports that track framing internally.
402 transports.output.write(writeBuf[0 .. 10]);
403 transports.output.flush();
404 transports.output.write(writeBuf[10 .. 13]);
405 transports.output.flush();
406
407 // Now read 4 bytes, so that we are partway through the written data.
408 auto bytesRead = transports.input.read(readBuf[0 .. 4]);
409 enforce(bytesRead == 4);
410
411 // Now attempt to read 10 bytes. Only 9 more are available.
412 //
413 // We should be able to get all 9 bytes, but it might take multiple read
414 // calls, since it is valid for read() to return fewer bytes than requested.
415 // (Most transports do immediately return 9 bytes, but the framing transports
416 // tend to only return to the end of the current frame, which is 6 bytes in
417 // this case.)
418 size_t totalRead = 0;
419 while (totalRead < 9) {
420 auto t = Trigger(dur!"seconds"(3), transports.output, 1);
421 bytesRead = transports.input.read(readBuf[4 + totalRead .. 14]);
422 enforce(t.fired == 0);
423 enforce(bytesRead > 0);
424 totalRead += bytesRead;
425 enforce(totalRead <= 9);
426 }
427
428 enforce(totalRead == 9);
429 }
430
431 void testBorrowPartAvailable(CoupledTransports)() if (
432 isCoupledTransports!CoupledTransports
433 ) {
434 scope transports = new CoupledTransports;
435 assert(transports.input);
436 assert(transports.output);
437
438 ubyte[9] writeBuf = 'a';
439 ubyte[10] readBuf;
440
441 // Attemping to borrow 10 bytes when only 9 are available should return NULL
442 // immediately.
443 transports.output.write(writeBuf);
444 transports.output.flush();
445
446 auto t = Trigger(dur!"seconds"(3), transports.output, 1);
447 auto borrowLen = readBuf.length;
448 auto borrowedBuf = transports.input.borrow(readBuf.ptr, borrowLen);
449 enforce(t.fired == 0);
450 enforce(borrowedBuf is null);
451 }
452
453 void testReadNoneAvailable(CoupledTransports)() if (
454 isCoupledTransports!CoupledTransports
455 ) {
456 scope transports = new CoupledTransports;
457 assert(transports.input);
458 assert(transports.output);
459
460 // Attempting to read when no data is available should either block until
461 // some data is available, or fail immediately. (e.g., TSocket blocks,
462 // TMemoryBuffer just fails.)
463 //
464 // If the transport blocks, it should succeed once some data is available,
465 // even if less than the amount requested becomes available.
466 ubyte[10] readBuf;
467
468 auto t = Trigger(dur!"seconds"(1), transports.output, 2);
469 t.add(dur!"seconds"(1), transports.output, 8);
470
471 auto bytesRead = transports.input.read(readBuf);
472 if (bytesRead == 0) {
473 enforce(t.fired == 0);
474 } else {
475 enforce(t.fired == 1);
476 enforce(bytesRead == 2);
477 }
478 }
479
480 void testBorrowNoneAvailable(CoupledTransports)() if (
481 isCoupledTransports!CoupledTransports
482 ) {
483 scope transports = new CoupledTransports;
484 assert(transports.input);
485 assert(transports.output);
486
487 ubyte[16] writeBuf = 'a';
488
489 // Attempting to borrow when no data is available should fail immediately
490 auto t = Trigger(dur!"seconds"(1), transports.output, 10);
491
492 auto borrowLen = 10;
493 auto borrowedBuf = transports.input.borrow(null, borrowLen);
494 enforce(borrowedBuf is null);
495 enforce(t.fired == 0);
496 }
497
498
499 void doRwTest(CoupledTransports)(
500 size_t totalSize,
501 SizeGenerator wSizeGen,
502 SizeGenerator rSizeGen,
503 SizeGenerator wChunkSizeGen = new ConstantSizeGenerator(0),
504 SizeGenerator rChunkSizeGen = new ConstantSizeGenerator(0),
505 size_t maxOutstanding = 0
506 ) if (
507 isCoupledTransports!CoupledTransports
508 ) {
509 totalSize = cast(size_t)(totalSize * g_sizeMultiplier);
510
scope(failure)511 scope(failure) {
512 writefln("Test failed for %s: testReadWrite(%s, %s, %s, %s, %s, %s)",
513 CoupledTransports.stringof, totalSize, wSizeGen, rSizeGen,
514 wChunkSizeGen, rChunkSizeGen, maxOutstanding);
515 }
516
517 testReadWrite!CoupledTransports(totalSize, wSizeGen, rSizeGen,
518 wChunkSizeGen, rChunkSizeGen, maxOutstanding);
519 }
520
521 void doBlockingTest(CoupledTransports)() if (
522 isCoupledTransports!CoupledTransports
523 ) {
writeFailure(string name)524 void writeFailure(string name) {
525 writefln("Test failed for %s: %s()", CoupledTransports.stringof, name);
526 }
527
528 {
529 scope(failure) writeFailure("testReadPartAvailable");
530 testReadPartAvailable!CoupledTransports();
531 }
532
533 {
534 scope(failure) writeFailure("testReadPartialMidframe");
535 testReadPartialMidframe!CoupledTransports();
536 }
537
538 {
539 scope(failure) writeFailure("testReadNoneAvaliable");
540 testReadNoneAvailable!CoupledTransports();
541 }
542
543 {
544 scope(failure) writeFailure("testBorrowPartAvailable");
545 testBorrowPartAvailable!CoupledTransports();
546 }
547
548 {
549 scope(failure) writeFailure("testBorrowNoneAvailable");
550 testBorrowNoneAvailable!CoupledTransports();
551 }
552 }
553
getGenerator(T)554 SizeGenerator getGenerator(T)(T t) {
555 static if (is(T : SizeGenerator)) {
556 return t;
557 } else {
558 return new ConstantSizeGenerator(t);
559 }
560 }
561
562 template WrappedTransports(T) if (isCoupledTransports!T) {
563 alias TypeTuple!(
564 T,
565 CoupledBufferedTransports!T,
566 CoupledFramedTransports!T,
567 CoupledZlibTransports!T
568 ) WrappedTransports;
569 }
570
571 void testRw(C, R, S)(
572 size_t totalSize,
573 R wSize,
574 S rSize
575 ) if (
576 isCoupledTransports!C && is(typeof(getGenerator(wSize))) &&
577 is(typeof(getGenerator(rSize)))
578 ) {
579 testRw!C(totalSize, wSize, rSize, 0, 0, 0);
580 }
581
582 void testRw(C, R, S, T, U)(
583 size_t totalSize,
584 R wSize,
585 S rSize,
586 T wChunkSize,
587 U rChunkSize,
588 size_t maxOutstanding = 0
589 ) if (
590 isCoupledTransports!C && is(typeof(getGenerator(wSize))) &&
591 is(typeof(getGenerator(rSize))) && is(typeof(getGenerator(wChunkSize))) &&
592 is(typeof(getGenerator(rChunkSize)))
593 ) {
594 foreach (T; WrappedTransports!C) {
595 doRwTest!T(
596 totalSize,
597 getGenerator(wSize),
598 getGenerator(rSize),
599 getGenerator(wChunkSize),
600 getGenerator(rChunkSize),
601 maxOutstanding
602 );
603 }
604 }
605
606 void testBlocking(C)() if (isCoupledTransports!C) {
607 foreach (T; WrappedTransports!C) {
608 doBlockingTest!T();
609 }
610 }
611
612 // A quick hack, for the sake of brevity…
613 float g_sizeMultiplier = 1;
614
version(Posix)615 version (Posix) {
616 immutable defaultTempDir = "/tmp";
617 } else version (Windows) {
618 import core.sys.windows.windows;
619 extern(Windows) DWORD GetTempPathA(DWORD nBufferLength, LPTSTR lpBuffer);
620
621 string defaultTempDir() @property {
622 char[MAX_PATH + 1] dir;
623 enforce(GetTempPathA(dir.length, dir.ptr));
624 return to!string(dir.ptr)[0 .. $ - 1];
625 }
626 } else static assert(false);
627
main(string[]args)628 void main(string[] args) {
629 int seed = unpredictableSeed();
630 string tmpDir = defaultTempDir;
631
632 getopt(args, "seed", &seed, "size-multiplier", &g_sizeMultiplier,
633 "tmp-dir", &tmpDir);
634 enforce(g_sizeMultiplier >= 0, "Size multiplier must not be negative.");
635
636 writefln("Using seed: %s", seed);
637 rndGen().seed(seed);
638 CoupledFileTransports.tmpDir = tmpDir;
639
640 auto rand4k = new RandomSizeGenerator(1, 4096);
641
642 /*
643 * We do the basically the same set of tests for each transport type,
644 * although we tweak the parameters in some places.
645 */
646
647 // TMemoryBuffer tests
648 testRw!CoupledMemoryBuffers(1024 * 1024, 0, 0);
649 testRw!CoupledMemoryBuffers(1024 * 256, rand4k, rand4k);
650 testRw!CoupledMemoryBuffers(1024 * 256, 167, 163);
651 testRw!CoupledMemoryBuffers(1024 * 16, 1, 1);
652
653 testRw!CoupledMemoryBuffers(1024 * 256, 0, 0, rand4k, rand4k);
654 testRw!CoupledMemoryBuffers(1024 * 256, rand4k, rand4k, rand4k, rand4k);
655 testRw!CoupledMemoryBuffers(1024 * 256, 167, 163, rand4k, rand4k);
656 testRw!CoupledMemoryBuffers(1024 * 16, 1, 1, rand4k, rand4k);
657
658 testBlocking!CoupledMemoryBuffers();
659
660 // TSocket tests
661 enum socketMaxOutstanding = 4096;
662 testRw!CoupledSocketTransports(1024 * 1024, 0, 0,
663 0, 0, socketMaxOutstanding);
664 testRw!CoupledSocketTransports(1024 * 256, rand4k, rand4k,
665 0, 0, socketMaxOutstanding);
666 testRw!CoupledSocketTransports(1024 * 256, 167, 163,
667 0, 0, socketMaxOutstanding);
668 // Doh. Apparently writing to a socket has some additional overhead for
669 // each send() call. If we have more than ~400 outstanding 1-byte write
670 // requests, additional send() calls start blocking.
671 testRw!CoupledSocketTransports(1024 * 16, 1, 1,
672 0, 0, 250);
673 testRw!CoupledSocketTransports(1024 * 256, 0, 0,
674 rand4k, rand4k, socketMaxOutstanding);
675 testRw!CoupledSocketTransports(1024 * 256, rand4k, rand4k,
676 rand4k, rand4k, socketMaxOutstanding);
677 testRw!CoupledSocketTransports(1024 * 256, 167, 163,
678 rand4k, rand4k, socketMaxOutstanding);
679 testRw!CoupledSocketTransports(1024 * 16, 1, 1,
680 rand4k, rand4k, 250);
681
682 testBlocking!CoupledSocketTransports();
683
684 // File transport tests.
685
686 // Cannot write more than the frame size at once.
687 enum maxWriteAtOnce = 1024 * 1024 * 16 - 4;
688
689 testRw!CoupledFileTransports(1024 * 1024, maxWriteAtOnce, 0);
690 testRw!CoupledFileTransports(1024 * 256, rand4k, rand4k);
691 testRw!CoupledFileTransports(1024 * 256, 167, 163);
692 testRw!CoupledFileTransports(1024 * 16, 1, 1);
693
694 testRw!CoupledFileTransports(1024 * 256, 0, 0, rand4k, rand4k);
695 testRw!CoupledFileTransports(1024 * 256, rand4k, rand4k, rand4k, rand4k);
696 testRw!CoupledFileTransports(1024 * 256, 167, 163, rand4k, rand4k);
697 testRw!CoupledFileTransports(1024 * 16, 1, 1, rand4k, rand4k);
698
699 testBlocking!CoupledFileTransports();
700 }
701
702
703 /*
704 * Timer handling code for use in tests that check the transport blocking
705 * semantics.
706 *
707 * The implementation has been hacked together in a hurry and wastes a lot of
708 * threads, but speed should not be the concern here.
709 */
710
711 struct Trigger {
thisTrigger712 this(Duration timeout, TTransport transport, size_t writeLength) {
713 mutex_ = new Mutex;
714 cancelCondition_ = new Condition(mutex_);
715 info_ = new Info(timeout, transport, writeLength);
716 startThread();
717 }
718
~thisTrigger719 ~this() {
720 synchronized (mutex_) {
721 info_ = null;
722 cancelCondition_.notifyAll();
723 }
724 if (thread_) thread_.join();
725 }
726
thisTrigger727 @disable this(this) { assert(0); }
728
addTrigger729 void add(Duration timeout, TTransport transport, size_t writeLength) {
730 synchronized (mutex_) {
731 auto info = new Info(timeout, transport, writeLength);
732 if (info_) {
733 auto prev = info_;
734 while (prev.next) prev = prev.next;
735 prev.next = info;
736 } else {
737 info_ = info;
738 startThread();
739 }
740 }
741 }
742
firedTrigger743 @property short fired() {
744 return atomicLoad(fired_);
745 }
746
747 private:
timerThreadTrigger748 void timerThread() {
749 // KLUDGE: Make sure the std.concurrency mbox is initialized on the timer
750 // thread to be able to unblock the file transport.
751 import std.concurrency;
752 thisTid;
753
754 synchronized (mutex_) {
755 while (info_) {
756 auto cancelled = cancelCondition_.wait(info_.timeout);
757 if (cancelled) {
758 info_ = null;
759 break;
760 }
761
762 atomicOp!"+="(fired_, 1);
763
764 // Write some data to the transport to unblock it.
765 auto buf = new ubyte[info_.writeLength];
766 buf[] = 'b';
767 info_.transport.write(buf);
768 info_.transport.flush();
769
770 info_ = info_.next;
771 }
772 }
773
774 thread_ = null;
775 }
776
startThreadTrigger777 void startThread() {
778 thread_ = new Thread(&timerThread);
779 thread_.start();
780 }
781
782 struct Info {
thisTrigger::Info783 this(Duration timeout, TTransport transport, size_t writeLength) {
784 this.timeout = timeout;
785 this.transport = transport;
786 this.writeLength = writeLength;
787 }
788
789 Duration timeout;
790 TTransport transport;
791 size_t writeLength;
792 Info* next;
793 }
794
795 Info* info_;
796 Thread thread_;
797 shared short fired_;
798
799 import core.sync.mutex;
800 Mutex mutex_;
801 import core.sync.condition;
802 Condition cancelCondition_;
803 }
804