1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 module thrift.util.future;
20
21 import core.atomic;
22 import core.sync.condition;
23 import core.sync.mutex;
24 import core.time : Duration;
25 import std.array : empty, front, popFront;
26 import std.conv : to;
27 import std.exception : enforce;
28 import std.traits : BaseTypeTuple, isSomeFunction, ParameterTypeTuple, ReturnType;
29 import thrift.base;
30 import thrift.util.awaitable;
31 import thrift.util.cancellation;
32
33 /**
34 * Represents an operation which is executed asynchronously and the result of
35 * which will become available at some point in the future.
36 *
37 * Once a operation is completed, the result of the operation can be fetched
38 * via the get() family of methods. There are three possible cases: Either the
39 * operation succeeded, then its return value is returned, or it failed by
40 * throwing, in which case the exception is rethrown, or it was cancelled
41 * before, then a TCancelledException is thrown. There might be TFuture
42 * implementations which never possibly enter the cancelled state.
43 *
44 * All methods are thread-safe, but keep in mind that any exception object or
45 * result (if it is a reference type, of course) is shared between all
46 * get()-family invocations.
47 */
TFuture(ResultType)48 interface TFuture(ResultType) {
49 /**
50 * The status the operation is currently in.
51 *
52 * An operation starts out in RUNNING status, and changes state to one of the
53 * others at most once afterwards.
54 */
55 TFutureStatus status() @property;
56
57 /**
58 * A TAwaitable triggered when the operation leaves the RUNNING status.
59 */
60 TAwaitable completion() @property;
61
62 /**
63 * Convenience shorthand for waiting until the result is available and then
64 * get()ing it.
65 *
66 * If the operation has already completed, the result is immediately
67 * returned.
68 *
69 * The result of this method is »alias this«'d to the interface, so that
70 * TFuture can be used as a drop-in replacement for a simple value in
71 * synchronous code.
72 */
73 final ResultType waitGet() {
74 completion.wait();
75 return get();
76 }
77 final @property auto waitGetProperty() { return waitGet(); }
78 alias waitGetProperty this;
79
80 /**
81 * Convenience shorthand for waiting until the result is available and then
82 * get()ing it.
83 *
84 * If the operation completes in time, returns its result (resp. throws an
85 * exception for the failed/cancelled cases). If not, throws a
86 * TFutureException.
87 */
88 final ResultType waitGet(Duration timeout) {
89 enforce(completion.wait(timeout), new TFutureException(
90 "Operation did not complete in time."));
91 return get();
92 }
93
94 /**
95 * Returns the result of the operation.
96 *
97 * Throws: TFutureException if the operation has been cancelled,
98 * TCancelledException if it is not yet done; the set exception if it
99 * failed.
100 */
101 ResultType get();
102
103 /**
104 * Returns the captured exception if the operation failed, or null otherwise.
105 *
106 * Throws: TFutureException if not yet done, TCancelledException if the
107 * operation has been cancelled.
108 */
109 Exception getException();
110 }
111
112 /**
113 * The states the operation offering a future interface can be in.
114 */
115 enum TFutureStatus : byte {
116 RUNNING, /// The operation is still running.
117 SUCCEEDED, /// The operation completed without throwing an exception.
118 FAILED, /// The operation completed by throwing an exception.
119 CANCELLED /// The operation was cancelled.
120 }
121
122 /**
123 * A TFuture covering the simple but common case where the result is simply
124 * set by a call to succeed()/fail().
125 *
126 * All methods are thread-safe, but usually, succeed()/fail() are only called
127 * from a single thread (different from the thread(s) waiting for the result
128 * using the TFuture interface, though).
129 */
130 class TPromise(ResultType) : TFuture!ResultType {
this()131 this() {
132 statusMutex_ = new Mutex;
133 completionEvent_ = new TOneshotEvent;
134 }
135
status()136 override S status() const @property {
137 return atomicLoad(status_);
138 }
139
completion()140 override TAwaitable completion() @property {
141 return completionEvent_;
142 }
143
get()144 override ResultType get() {
145 auto s = atomicLoad(status_);
146 enforce(s != S.RUNNING,
147 new TFutureException("Operation not yet completed."));
148
149 if (s == S.CANCELLED) throw new TCancelledException;
150 if (s == S.FAILED) throw exception_;
151
152 static if (!is(ResultType == void)) {
153 return result_;
154 }
155 }
156
getException()157 override Exception getException() {
158 auto s = atomicLoad(status_);
159 enforce(s != S.RUNNING,
160 new TFutureException("Operation not yet completed."));
161
162 if (s == S.CANCELLED) throw new TCancelledException;
163 if (s == S.SUCCEEDED) return null;
164
165 return exception_;
166 }
167
168 static if (!is(ResultType == void)) {
169 /**
170 * Sets the result of the operation, marks it as done, and notifies any
171 * waiters.
172 *
173 * If the operation has been cancelled before, nothing happens.
174 *
175 * Throws: TFutureException if the operation is already completed.
176 */
succeed(ResultType result)177 void succeed(ResultType result) {
178 synchronized (statusMutex_) {
179 auto s = atomicLoad(status_);
180 if (s == S.CANCELLED) return;
181
182 enforce(s == S.RUNNING,
183 new TFutureException("Operation already completed."));
184 result_ = result;
185
186 atomicStore(status_, S.SUCCEEDED);
187 }
188
189 completionEvent_.trigger();
190 }
191 } else {
succeed()192 void succeed() {
193 synchronized (statusMutex_) {
194 auto s = atomicLoad(status_);
195 if (s == S.CANCELLED) return;
196
197 enforce(s == S.RUNNING,
198 new TFutureException("Operation already completed."));
199
200 atomicStore(status_, S.SUCCEEDED);
201 }
202
203 completionEvent_.trigger();
204 }
205 }
206
207 /**
208 * Marks the operation as failed with the specified exception and notifies
209 * any waiters.
210 *
211 * If the operation was already cancelled, nothing happens.
212 *
213 * Throws: TFutureException if the operation is already completed.
214 */
fail(Exception exception)215 void fail(Exception exception) {
216 synchronized (statusMutex_) {
217 auto status = atomicLoad(status_);
218 if (status == S.CANCELLED) return;
219
220 enforce(status == S.RUNNING,
221 new TFutureException("Operation already completed."));
222 exception_ = exception;
223
224 atomicStore(status_, S.FAILED);
225 }
226
227 completionEvent_.trigger();
228 }
229
230
231 /**
232 * Marks this operation as completed and takes over the outcome of another
233 * TFuture of the same type.
234 *
235 * If this operation was already cancelled, nothing happens. If the other
236 * operation was cancelled, this operation is marked as failed with a
237 * TCancelledException.
238 *
239 * Throws: TFutureException if the passed in future was not completed or
240 * this operation is already completed.
241 */
242 void complete(TFuture!ResultType future) {
synchronized(statusMutex_)243 synchronized (statusMutex_) {
244 auto status = atomicLoad(status_);
245 if (status == S.CANCELLED) return;
246 enforce(status == S.RUNNING,
247 new TFutureException("Operation already completed."));
248
249 enforce(future.status != S.RUNNING, new TFutureException(
250 "The passed TFuture is not yet completed."));
251
252 status = future.status;
253 if (status == S.CANCELLED) {
254 status = S.FAILED;
255 exception_ = new TCancelledException;
256 } else if (status == S.FAILED) {
257 exception_ = future.getException();
258 } else static if (!is(ResultType == void)) {
259 result_ = future.get();
260 }
261
262 atomicStore(status_, status);
263 }
264
265 completionEvent_.trigger();
266 }
267
268 /**
269 * Marks this operation as cancelled and notifies any waiters.
270 *
271 * If the operation is already completed, nothing happens.
272 */
cancel()273 void cancel() {
274 synchronized (statusMutex_) {
275 auto status = atomicLoad(status_);
276 if (status == S.RUNNING) atomicStore(status_, S.CANCELLED);
277 }
278
279 completionEvent_.trigger();
280 }
281
282 private:
283 // Convenience alias because TFutureStatus is ubiquitous in this class.
284 alias TFutureStatus S;
285
286 // The status the promise is currently in.
287 shared S status_;
288
289 union {
290 static if (!is(ResultType == void)) {
291 // Set if status_ is SUCCEEDED.
292 ResultType result_;
293 }
294 // Set if status_ is FAILED.
295 Exception exception_;
296 }
297
298 // Protects status_.
299 // As for result_ and exception_: They are only set once, while status_ is
300 // still RUNNING, so given that the operation has already completed, reading
301 // them is safe without holding some kind of lock.
302 Mutex statusMutex_;
303
304 // Triggered when the event completes.
305 TOneshotEvent completionEvent_;
306 }
307
308 ///
309 class TFutureException : TException {
310 ///
311 this(string msg = "", string file = __FILE__, size_t line = __LINE__,
312 Throwable next = null)
313 {
314 super(msg, file, line, next);
315 }
316 }
317
318 /**
319 * Creates an interface that is similar to a given one, but accepts an
320 * additional, optional TCancellation parameter each method, and returns
321 * TFutures instead of plain return values.
322 *
323 * For example, given the following declarations:
324 * ---
325 * interface Foo {
326 * void bar();
327 * string baz(int a);
328 * }
329 * alias TFutureInterface!Foo FutureFoo;
330 * ---
331 *
332 * FutureFoo would be equivalent to:
333 * ---
334 * interface FutureFoo {
335 * TFuture!void bar(TCancellation cancellation = null);
336 * TFuture!string baz(int a, TCancellation cancellation = null);
337 * }
338 * ---
339 */
340 template TFutureInterface(Interface) if (is(Interface _ == interface)) {
341 mixin({
342 string code = "interface TFutureInterface \n";
343
344 static if (is(Interface Bases == super) && Bases.length > 0) {
345 code ~= ": ";
346 foreach (i; 0 .. Bases.length) {
347 if (i > 0) code ~= ", ";
348 code ~= "TFutureInterface!(BaseTypeTuple!Interface[" ~ to!string(i) ~ "]) ";
349 }
350 }
351
352 code ~= "{\n";
353
354 foreach (methodName; __traits(derivedMembers, Interface)) {
355 enum qn = "Interface." ~ methodName;
356 static if (isSomeFunction!(mixin(qn))) {
357 code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~
358 "(ParameterTypeTuple!(" ~ qn ~ "), TCancellation cancellation = null);\n";
359 }
360 }
361
362 code ~= "}\n";
363 return code;
364 }());
365 }
366
367 /**
368 * An input range that aggregates results from multiple asynchronous operations,
369 * returning them in the order they arrive.
370 *
371 * Additionally, a timeout can be set after which results from not yet finished
372 * futures will no longer be waited for, e.g. to ensure the time it takes to
373 * iterate over a set of results is limited.
374 */
TFutureAggregatorRange(T)375 final class TFutureAggregatorRange(T) {
376 /**
377 * Constructs a new instance.
378 *
379 * Params:
380 * futures = The set of futures to collect results from.
381 * timeout = If positive, not yet finished futures will be cancelled and
382 * their results will not be taken into account.
383 */
384 this(TFuture!T[] futures, TCancellationOrigin childCancellation,
385 Duration timeout = dur!"hnsecs"(0)
386 ) {
387 if (timeout > dur!"hnsecs"(0)) {
388 timeoutSysTick_ = TickDuration.currSystemTick +
389 TickDuration.from!"hnsecs"(timeout.total!"hnsecs");
390 } else {
391 timeoutSysTick_ = TickDuration(0);
392 }
393
394 queueMutex_ = new Mutex;
395 queueNonEmptyCondition_ = new Condition(queueMutex_);
396 futures_ = futures;
397 childCancellation_ = childCancellation;
398
399 foreach (future; futures_) {
400 future.completion.addCallback({
401 auto f = future;
402 return {
403 if (f.status == TFutureStatus.CANCELLED) return;
404 assert(f.status != TFutureStatus.RUNNING);
405
406 synchronized (queueMutex_) {
407 completedQueue_ ~= f;
408
409 if (completedQueue_.length == 1) {
410 queueNonEmptyCondition_.notifyAll();
411 }
412 }
413 };
414 }());
415 }
416 }
417
418 /**
419 * Whether the range is empty.
420 *
421 * This is the case if the results from the completed futures not having
422 * failed have already been popped and either all future have been finished
423 * or the timeout has expired.
424 *
425 * Potentially blocks until a new result is available or the timeout has
426 * expired.
427 */
428 bool empty() @property {
429 if (finished_) return true;
430 if (bufferFilled_) return false;
431
432 while (true) {
433 TFuture!T future;
434 synchronized (queueMutex_) {
435 // The while loop is just being cautious about spurious wakeups, in
436 // case they should be possible.
437 while (completedQueue_.empty) {
438 auto remaining = to!Duration(timeoutSysTick_ -
439 TickDuration.currSystemTick);
440
441 if (remaining <= dur!"hnsecs"(0)) {
442 // No time left, but still no element received – we are empty now.
443 finished_ = true;
444 childCancellation_.trigger();
445 return true;
446 }
447
448 queueNonEmptyCondition_.wait(remaining);
449 }
450
451 future = completedQueue_.front;
452 completedQueue_.popFront();
453 }
454
455 ++completedCount_;
456 if (completedCount_ == futures_.length) {
457 // This was the last future in the list, there is no possibility
458 // another result could ever become available.
459 finished_ = true;
460 }
461
462 if (future.status == TFutureStatus.FAILED) {
463 // This one failed, loop again and try getting another item from
464 // the queue.
465 exceptions_ ~= future.getException();
466 } else {
467 resultBuffer_ = future.get();
468 bufferFilled_ = true;
469 return false;
470 }
471 }
472 }
473
474 /**
475 * Returns the first element from the range.
476 *
477 * Potentially blocks until a new result is available or the timeout has
478 * expired.
479 *
480 * Throws: TException if the range is empty.
481 */
482 T front() {
483 enforce(!empty, new TException(
484 "Cannot get front of an empty future aggregator range."));
485 return resultBuffer_;
486 }
487
488 /**
489 * Removes the first element from the range.
490 *
491 * Potentially blocks until a new result is available or the timeout has
492 * expired.
493 *
494 * Throws: TException if the range is empty.
495 */
496 void popFront() {
497 enforce(!empty, new TException(
498 "Cannot pop front of an empty future aggregator range."));
499 bufferFilled_ = false;
500 }
501
502 /**
503 * The number of futures the result of which has been returned or which have
504 * failed so far.
505 */
506 size_t completedCount() @property const {
507 return completedCount_;
508 }
509
510 /**
511 * The exceptions collected from failed TFutures so far.
512 */
513 Exception[] exceptions() @property {
514 return exceptions_;
515 }
516
517 private:
518 TFuture!T[] futures_;
519 TCancellationOrigin childCancellation_;
520
521 // The system tick this operation will time out, or zero if no timeout has
522 // been set.
523 TickDuration timeoutSysTick_;
524
525 bool finished_;
526
527 bool bufferFilled_;
528 T resultBuffer_;
529
530 Exception[] exceptions_;
531 size_t completedCount_;
532
533 // The queue of completed futures. This (and the associated condition) are
534 // the only parts of this class that are accessed by multiple threads.
535 TFuture!T[] completedQueue_;
536 Mutex queueMutex_;
537 Condition queueNonEmptyCondition_;
538 }
539
540 /**
541 * TFutureAggregatorRange construction helper to avoid having to explicitly
542 * specify the value type, i.e. to allow the constructor being called using IFTI
543 * (see $(DMDBUG 6082, D Bugzilla enhancement requet 6082)).
544 */
545 TFutureAggregatorRange!T tFutureAggregatorRange(T)(TFuture!T[] futures,
546 TCancellationOrigin childCancellation, Duration timeout = dur!"hnsecs"(0)
547 ) {
548 return new TFutureAggregatorRange!T(futures, childCancellation, timeout);
549 }
550