1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.util.awaitable;
20 
21 import core.sync.condition;
22 import core.sync.mutex;
23 import core.time : Duration;
24 import std.exception : enforce;
25 import std.socket/+ : Socket, socketPair+/; // DMD @@BUG314@@
26 import thrift.base;
27 
28 // To avoid DMD @@BUG6395@@.
29 import thrift.internal.algorithm;
30 
31 /**
32  * An event that can occur at some point in the future and which can be
33  * awaited, either by blocking until it occurs, or by registering a callback
34  * delegate.
35  */
36 interface TAwaitable {
37   /**
38    * Waits until the event occurs.
39    *
40    * Calling wait() for an event that has already occurred is a no-op.
41    */
42   void wait();
43 
44   /**
45    * Waits until the event occurs or the specified timeout expires.
46    *
47    * Calling wait() for an event that has already occurred is a no-op.
48    *
49    * Returns: Whether the event was triggered before the timeout expired.
50    */
51   bool wait(Duration timeout);
52 
53   /**
54    * Registers a callback that is called if the event occurs.
55    *
56    * The delegate will likely be invoked from a different thread, and is
57    * expected not to perform expensive work as it will usually be invoked
58    * synchronously by the notifying thread. The order in which registered
59    * callbacks are invoked is not specified.
60    *
61    * The callback must never throw, but nothrow semantics are difficult to
62    * enforce, so currently exceptions are just swallowed by
63    * TAwaitable implementations.
64    *
65    * If the event has already occurred, the delegate is immediately executed
66    * in the current thread.
67    */
68   void addCallback(void delegate() dg);
69 
70   /**
71    * Removes a previously added callback.
72    *
73    * Returns: Whether the callback could be found in the list, i.e. whether it
74    *   was previously added.
75    */
76   bool removeCallback(void delegate() dg);
77 }
78 
79 /**
80  * A simple TAwaitable event triggered by just calling a trigger() method.
81  */
82 class TOneshotEvent : TAwaitable {
this()83   this() {
84     mutex_ = new Mutex;
85     condition_ = new Condition(mutex_);
86   }
87 
wait()88   override void wait() {
89     synchronized (mutex_) {
90       while (!triggered_) condition_.wait();
91     }
92   }
93 
wait(Duration timeout)94   override bool wait(Duration timeout) {
95     synchronized (mutex_) {
96       if (triggered_) return true;
97       condition_.wait(timeout);
98       return triggered_;
99     }
100   }
101 
addCallback(void delegate ()dg)102   override void addCallback(void delegate() dg) {
103     mutex_.lock();
104     scope (failure) mutex_.unlock();
105 
106     callbacks_ ~= dg;
107 
108     if (triggered_) {
109       mutex_.unlock();
110       dg();
111       return;
112     }
113 
114     mutex_.unlock();
115   }
116 
removeCallback(void delegate ()dg)117   override bool removeCallback(void delegate() dg) {
118     synchronized (mutex_) {
119       auto oldLength = callbacks_.length;
120       callbacks_ = removeEqual(callbacks_, dg);
121       return callbacks_.length < oldLength;
122     }
123   }
124 
125   /**
126    * Triggers the event.
127    *
128    * Any registered event callbacks are executed synchronously before the
129    * function returns.
130    */
trigger()131   void trigger() {
132     synchronized (mutex_) {
133       if (!triggered_) {
134         triggered_ = true;
135         condition_.notifyAll();
136         foreach (c; callbacks_) c();
137       }
138     }
139   }
140 
141 private:
142   bool triggered_;
143   Mutex mutex_;
144   Condition condition_;
145   void delegate()[] callbacks_;
146 }
147 
148 /**
149  * Translates TAwaitable events into dummy messages on a socket that can be
150  * used e.g. to wake up from a select() call.
151  */
152 final class TSocketNotifier {
this()153   this() {
154     auto socks = socketPair();
155     foreach (s; socks) s.blocking = false;
156     sendSocket_ = socks[0];
157     recvSocket_ = socks[1];
158   }
159 
160   /**
161    * The socket the messages will be sent to.
162    */
socket()163   Socket socket() @property {
164     return recvSocket_;
165   }
166 
167   /**
168    * Atatches the socket notifier to the specified awaitable, causing it to
169    * write a byte to the notification socket when the awaitable callbacks are
170    * invoked.
171    *
172    * If the event has already been triggered, the dummy byte is written
173    * immediately to the socket.
174    *
175    * A socket notifier can only be attached to a single awaitable at a time.
176    *
177    * Throws: TException if the socket notifier is already attached.
178    */
attach(TAwaitable awaitable)179   void attach(TAwaitable awaitable) {
180     enforce(!awaitable_, new TException("Already attached."));
181     awaitable.addCallback(&notify);
182     awaitable_ = awaitable;
183   }
184 
185   /**
186    * Detaches the socket notifier from the awaitable it is currently attached
187    * to.
188    *
189    * Throws: TException if the socket notifier is not currently attached.
190    */
detach()191   void detach() {
192     enforce(awaitable_, new TException("Not attached."));
193 
194     // Soak up any not currently read notification bytes.
195     ubyte[1] dummy = void;
196     while (recvSocket_.receive(dummy) != Socket.ERROR) {}
197 
198     auto couldRemove = awaitable_.removeCallback(&notify);
199     assert(couldRemove);
200     awaitable_ = null;
201   }
202 
203 private:
notify()204   void notify() {
205     ubyte[1] zero;
206     sendSocket_.send(zero);
207   }
208 
209   TAwaitable awaitable_;
210   Socket sendSocket_;
211   Socket recvSocket_;
212 }
213