1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.async.libevent;
20 
21 import core.atomic;
22 import core.time : Duration, dur;
23 import core.exception : onOutOfMemoryError;
24 import core.memory : GC;
25 import core.thread : Fiber, Thread;
26 import core.sync.condition;
27 import core.sync.mutex;
28 import core.stdc.stdlib : free, malloc;
29 import deimos.event2.event;
30 import std.array : empty, front, popFront;
31 import std.conv : text, to;
32 import std.exception : enforce;
33 import std.socket : Socket, socketPair;
34 import thrift.base;
35 import thrift.async.base;
36 import thrift.internal.socket;
37 import thrift.internal.traits;
38 import thrift.util.cancellation;
39 
40 // To avoid DMD @@BUG6395@@.
41 import thrift.internal.algorithm;
42 
43 /**
44  * A TAsyncManager implementation based on libevent.
45  *
46  * The libevent loop for handling non-blocking sockets is run in a background
47  * thread, which is lazily spawned. The thread is not daemonized to avoid
48  * crashes on program shutdown, it is only stopped when the manager instance
49  * is destroyed. So, to ensure a clean program teardown, either make sure this
50  * instance gets destroyed (e.g. by using scope), or manually call stop() at
51  * the end.
52  */
53 class TLibeventAsyncManager : TAsyncSocketManager {
this()54   this() {
55     eventBase_ = event_base_new();
56 
57     // Set up the socket pair for transferring control messages to the event
58     // loop.
59     auto pair = socketPair();
60     controlSendSocket_ = pair[0];
61     controlReceiveSocket_ = pair[1];
62     controlReceiveSocket_.blocking = false;
63 
64     // Register an event for receiving control messages.
65     controlReceiveEvent_ = event_new(eventBase_, cast(evutil_socket_t)controlReceiveSocket_.handle,
66       EV_READ | EV_PERSIST | EV_ET, assumeNothrow(&controlMsgReceiveCallback),
67       cast(void*)this);
68     event_add(controlReceiveEvent_, null);
69 
70     queuedCountMutex_ = new Mutex;
71     zeroQueuedCondition_ = new Condition(queuedCountMutex_);
72   }
73 
~this()74   ~this() {
75     // stop() should be safe to call, because either we don't have a worker
76     // thread running and it is a no-op anyway, or it is guaranteed to be
77     // still running (blocked in event_base_loop), and thus guaranteed not to
78     // be garbage collected yet.
79     stop(dur!"hnsecs"(0));
80 
81     event_free(controlReceiveEvent_);
82     event_base_free(eventBase_);
83     eventBase_ = null;
84   }
85 
86   override void execute(TAsyncTransport transport, Work work,
87     TCancellation cancellation = null
88   ) {
89     if (cancellation && cancellation.triggered) return;
90 
91     // Keep track that there is a new work item to be processed.
92     incrementQueuedCount();
93 
94     ensureWorkerThreadRunning();
95 
96     // We should be able to send the control message as a whole – we currently
97     // assume to be able to receive it at once as well. If this proves to be
98     // unstable (e.g. send could possibly return early if the receiving buffer
99     // is full and the blocking call gets interrupted by a signal), it could
100     // be changed to a more sophisticated scheme.
101 
102     // Make sure the delegate context doesn't get GCd while the work item is
103     // on the wire.
104     GC.addRoot(work.ptr);
105 
106     // Send work message.
107     sendControlMsg(ControlMsg(MsgType.WORK, work, transport));
108 
109     if (cancellation) {
110       cancellation.triggering.addCallback({
111         sendControlMsg(ControlMsg(MsgType.CANCEL, work, transport));
112       });
113     }
114   }
115 
delay(Duration duration,void delegate ()work)116   override void delay(Duration duration, void delegate() work) {
117     incrementQueuedCount();
118 
119     ensureWorkerThreadRunning();
120 
121     const tv = toTimeval(duration);
122 
123     // DMD @@BUG@@: Cannot deduce T to void delegate() here.
124     registerOneshotEvent!(void delegate())(
125       -1, 0, assumeNothrow(&delayCallback), &tv,
126       {
127         work();
128         decrementQueuedCount();
129       }
130     );
131   }
132 
133   override bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1)) {
134     bool cleanExit = true;
135 
synchronized(this)136     synchronized (this) {
137       if (workerThread_) {
138         synchronized (queuedCountMutex_) {
139           if (waitFinishTimeout > dur!"hnsecs"(0)) {
140             if (queuedCount_ > 0) {
141               zeroQueuedCondition_.wait(waitFinishTimeout);
142             }
143           } else if (waitFinishTimeout < dur!"hnsecs"(0)) {
144             while (queuedCount_ > 0) zeroQueuedCondition_.wait();
145           } else {
146             // waitFinishTimeout is zero, immediately exit in all cases.
147           }
148           cleanExit = (queuedCount_ == 0);
149         }
150 
151         event_base_loopbreak(eventBase_);
152         sendControlMsg(ControlMsg(MsgType.SHUTDOWN));
153         workerThread_.join();
154         workQueues_ = null;
155         // We have nuked all currently enqueued items, so set the count to
156         // zero. This is safe to do without locking, since the worker thread
157         // is down.
158         queuedCount_ = 0;
159         atomicStore(*(cast(shared)&workerThread_), cast(shared(Thread))null);
160       }
161     }
162 
163     return cleanExit;
164   }
165 
addOneshotListener(Socket socket,TAsyncEventType eventType,TSocketEventListener listener)166   override void addOneshotListener(Socket socket, TAsyncEventType eventType,
167      TSocketEventListener listener
168   ) {
169     addOneshotListenerImpl(socket, eventType, null, listener);
170   }
171 
addOneshotListener(Socket socket,TAsyncEventType eventType,Duration timeout,TSocketEventListener listener)172   override void addOneshotListener(Socket socket, TAsyncEventType eventType,
173     Duration timeout, TSocketEventListener listener
174   ) {
175     if (timeout <= dur!"hnsecs"(0)) {
176       addOneshotListenerImpl(socket, eventType, null, listener);
177     } else {
178       // This is not really documented well, but libevent does not require to
179       // keep the timeval around after the event was added.
180       auto tv = toTimeval(timeout);
181       addOneshotListenerImpl(socket, eventType, &tv, listener);
182     }
183   }
184 
185 private:
186   alias void delegate() Work;
187 
addOneshotListenerImpl(Socket socket,TAsyncEventType eventType,const (timeval)* timeout,TSocketEventListener listener)188   void addOneshotListenerImpl(Socket socket, TAsyncEventType eventType,
189      const(timeval)* timeout, TSocketEventListener listener
190   ) {
191     registerOneshotEvent(cast(evutil_socket_t)socket.handle, libeventEventType(eventType),
192       assumeNothrow(&socketCallback), timeout, listener);
193   }
194 
registerOneshotEvent(T)195   void registerOneshotEvent(T)(evutil_socket_t fd, short type,
196     event_callback_fn callback, const(timeval)* timeout, T payload
197   ) {
198     // Create a copy of the payload on the C heap.
199     auto payloadMem = malloc(payload.sizeof);
200     if (!payloadMem) onOutOfMemoryError();
201     (cast(T*)payloadMem)[0 .. 1] = payload;
202     GC.addRange(payloadMem, payload.sizeof);
203 
204     auto result = event_base_once(eventBase_, fd, type, callback,
205       payloadMem, timeout);
206 
207     // Assuming that we didn't get our arguments wrong above, the only other
208     // situation in which event_base_once can fail is when it can't allocate
209     // memory.
210     if (result != 0) onOutOfMemoryError();
211   }
212 
213   enum MsgType : ubyte {
214     SHUTDOWN,
215     WORK,
216     CANCEL
217   }
218 
219   struct ControlMsg {
220     MsgType type;
221     Work work;
222     TAsyncTransport transport;
223   }
224 
225   /**
226    * Starts the worker thread if it is not already running.
227    */
ensureWorkerThreadRunning()228   void ensureWorkerThreadRunning() {
229     // Technically, only half barriers would be required here, but adding the
230     // argument seems to trigger a DMD template argument deduction @@BUG@@.
231     if (!atomicLoad(*(cast(shared)&workerThread_))) {
232       synchronized (this) {
233         if (!workerThread_) {
234           auto thread = new Thread({ event_base_loop(eventBase_, 0); });
235           thread.start();
236           atomicStore(*(cast(shared)&workerThread_), cast(shared)thread);
237         }
238       }
239     }
240   }
241 
242   /**
243    * Sends a control message to the worker thread.
244    */
sendControlMsg(const (ControlMsg)msg)245   void sendControlMsg(const(ControlMsg) msg) {
246     auto result = controlSendSocket_.send((&msg)[0 .. 1]);
247     enum size = msg.sizeof;
248     enforce(result == size, new TException(text(
249       "Sending control message of type ", msg.type, " failed (", result,
250       " bytes instead of ", size, " transmitted).")));
251   }
252 
253   /**
254    * Receives messages from the control message socket and acts on them. Called
255    * from the worker thread.
256    */
receiveControlMsg()257   void receiveControlMsg() {
258     // Read as many new work items off the socket as possible (at least one
259     // should be available, as we got notified by libevent).
260     ControlMsg msg;
261     ptrdiff_t bytesRead;
262     while (true) {
263       bytesRead = controlReceiveSocket_.receive(cast(ubyte[])((&msg)[0 .. 1]));
264 
265       if (bytesRead < 0) {
266         auto errno = getSocketErrno();
267         if (errno != WOULD_BLOCK_ERRNO) {
268           logError("Reading control message, some work item will possibly " ~
269             "never be executed: %s", socketErrnoString(errno));
270         }
271       }
272       if (bytesRead != msg.sizeof) break;
273 
274       // Everything went fine, we received a new control message.
275       final switch (msg.type) {
276         case MsgType.SHUTDOWN:
277           // The message was just intended to wake us up for shutdown.
278           break;
279 
280         case MsgType.CANCEL:
281           // When processing a cancellation, we must not touch the first item,
282           // since it is already being processed.
283           auto queue = workQueues_[msg.transport];
284           if (queue.length > 0) {
285             workQueues_[msg.transport] = [queue[0]] ~
286               removeEqual(queue[1 .. $], msg.work);
287           }
288           break;
289 
290         case MsgType.WORK:
291           // Now that the work item is back in the D world, we don't need the
292           // extra GC root for the context pointer anymore (see execute()).
293           GC.removeRoot(msg.work.ptr);
294 
295           // Add the work item to the queue and execute it.
296           auto queue = msg.transport in workQueues_;
297           if (queue is null || (*queue).empty) {
298             // If the queue is empty, add the new work item to the queue as well,
299             // but immediately start executing it.
300             workQueues_[msg.transport] = [msg.work];
301             executeWork(msg.transport, msg.work);
302           } else {
303             (*queue) ~= msg.work;
304           }
305           break;
306       }
307     }
308 
309     // If the last read was successful, but didn't read enough bytes, we got
310     // a problem.
311     if (bytesRead > 0) {
312       logError("Unexpected partial control message read (%s byte(s) " ~
313         "instead of %s), some work item will possibly never be executed.",
314         bytesRead, msg.sizeof);
315     }
316   }
317 
318   /**
319    * Executes the given work item and all others enqueued for the same
320    * transport in a new fiber. Called from the worker thread.
321    */
executeWork(TAsyncTransport transport,Work work)322   void executeWork(TAsyncTransport transport, Work work) {
323     (new Fiber({
324       auto item = work;
325       while (true) {
326         try {
327           // Execute the actual work. It will possibly add listeners to the
328           // event loop and yield away if it has to wait for blocking
329           // operations. It is quite possible that another fiber will modify
330           // the work queue for the current transport.
331           item();
332         } catch (Exception e) {
333           // This should never happen, just to be sure the worker thread
334           // doesn't stop working in mysterious ways because of an unhandled
335           // exception.
336           logError("Exception thrown by work item: %s", e);
337         }
338 
339         // Remove the item from the work queue.
340         // Note: Due to the value semantics of array slices, we have to
341         // re-lookup this on every iteration. This could be solved, but I'd
342         // rather replace this directly with a queue type once one becomes
343         // available in Phobos.
344         auto queue = workQueues_[transport];
345         assert(queue.front == item);
346         queue.popFront();
347         workQueues_[transport] = queue;
348 
349         // Now that the work item is done, no longer count it as queued.
350         decrementQueuedCount();
351 
352         if (queue.empty) break;
353 
354         // If the queue is not empty, execute the next waiting item.
355         item = queue.front;
356       }
357     })).call();
358   }
359 
360   /**
361    * Increments the amount of queued items.
362    */
incrementQueuedCount()363   void incrementQueuedCount() {
364     synchronized (queuedCountMutex_) {
365       ++queuedCount_;
366     }
367   }
368 
369   /**
370    * Decrements the amount of queued items.
371    */
decrementQueuedCount()372   void decrementQueuedCount() {
373     synchronized (queuedCountMutex_) {
374       assert(queuedCount_ > 0);
375       --queuedCount_;
376       if (queuedCount_ == 0) {
377         zeroQueuedCondition_.notifyAll();
378       }
379     }
380   }
381 
controlMsgReceiveCallback(evutil_socket_t,short,void * managerThis)382   static extern(C) void controlMsgReceiveCallback(evutil_socket_t, short,
383     void *managerThis
384   ) {
385     (cast(TLibeventAsyncManager)managerThis).receiveControlMsg();
386   }
387 
socketCallback(evutil_socket_t,short flags,void * arg)388   static extern(C) void socketCallback(evutil_socket_t, short flags,
389     void *arg
390   ) {
391     auto reason = (flags & EV_TIMEOUT) ? TAsyncEventReason.TIMED_OUT :
392       TAsyncEventReason.NORMAL;
393     (*(cast(TSocketEventListener*)arg))(reason);
394     GC.removeRange(arg);
395     destroy(arg);
396     free(arg);
397   }
398 
delayCallback(evutil_socket_t,short flags,void * arg)399   static extern(C) void delayCallback(evutil_socket_t, short flags,
400     void *arg
401   ) {
402     assert(flags & EV_TIMEOUT);
403     (*(cast(void delegate()*)arg))();
404     GC.removeRange(arg);
405     destroy(arg);
406     free(arg);
407   }
408 
409   Thread workerThread_;
410 
411   event_base* eventBase_;
412 
413   /// The socket used for receiving new work items in the event loop. Paired
414   /// with controlSendSocket_. Invalid (i.e. TAsyncWorkItem.init) items are
415   /// ignored and can be used to wake up the worker thread.
416   Socket controlReceiveSocket_;
417   event* controlReceiveEvent_;
418 
419   /// The socket used to send new work items to the event loop. It is
420   /// expected that work items can always be read at once from it, i.e. that
421   /// there will never be short reads.
422   Socket controlSendSocket_;
423 
424   /// Queued up work delegates for async transports. This also includes
425   /// currently active ones, they are removed from the queue on completion,
426   /// which is relied on by the control message receive fiber (the main one)
427   /// to decide whether to immediately start executing items or not.
428   // TODO: This should really be of some queue type, not an array slice, but
429   // std.container doesn't have anything.
430   Work[][TAsyncTransport] workQueues_;
431 
432   /// The total number of work items not yet finished (queued and currently
433   /// executed) and delays not yet executed.
434   uint queuedCount_;
435 
436   /// Protects queuedCount_.
437   Mutex queuedCountMutex_;
438 
439   /// Triggered when queuedCount_ reaches zero, protected by queuedCountMutex_.
440   Condition zeroQueuedCondition_;
441 }
442 
443 private {
toTimeval(const (Duration)dur)444   timeval toTimeval(const(Duration) dur) {
445     timeval tv;
446     dur.split!("seconds", "usecs")(tv.tv_sec, tv.tv_usec);
447     return tv;
448   }
449 
450   /**
451    * Returns the libevent flags combination to represent a given TAsyncEventType.
452    */
libeventEventType(TAsyncEventType type)453   short libeventEventType(TAsyncEventType type) {
454     final switch (type) {
455       case TAsyncEventType.READ:
456         return EV_READ | EV_ET;
457       case TAsyncEventType.WRITE:
458         return EV_WRITE | EV_ET;
459     }
460   }
461 }
462