/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /** * Defines the interface used for client-side handling of asynchronous * I/O operations, based on coroutines. * * The main piece of the »client side« (e.g. for TAsyncClient users) of the * API is TFuture, which represents an asynchronously executed operation, * which can have a return value, throw exceptions, and which can be waited * upon. * * On the »implementation side«, the idea is that by using a TAsyncTransport * instead of a normal TTransport and executing the work through a * TAsyncManager, the same code as for synchronous I/O can be used for * asynchronous operation as well, for example: * * --- * auto socket = new TAsyncSocket(someTAsyncSocketManager(), host, port); * // … * socket.asyncManager.execute(socket, { * SomeThriftStruct s; * * // Waiting for socket I/O will not block an entire thread but cause * // the async manager to execute another task in the meantime, because * // we are using TAsyncSocket instead of TSocket. * s.read(socket); * * // Do something with s, e.g. set a TPromise result to it. * writeln(s); * }); * --- */ module thrift.async.base; import core.time : Duration, dur; import std.socket/+ : Socket+/; // DMD @@BUG314@@ import thrift.base; import thrift.transport.base; import thrift.util.cancellation; /** * Manages one or more asynchronous transport resources (e.g. sockets in the * case of TAsyncSocketManager) and allows work items to be submitted for them. * * Implementations will typically run one or more background threads for * executing the work, which is one of the reasons for a TAsyncManager to be * used. Each work item is run in its own fiber and is expected to yield() away * while waiting for time-consuming operations. * * The second important purpose of TAsyncManager is to serialize access to * the transport resources – without taking care of that, e.g. issuing multiple * RPC calls over the same connection in rapid succession would likely lead to * more than one request being written at the same time, causing only garbage * to arrive at the remote end. * * All methods are thread-safe. */ interface TAsyncManager { /** * Submits a work item to be executed asynchronously. * * Access to asnyc transports is serialized – if two work items associated * with the same transport are submitted, the second delegate will not be * invoked until the first has returned, even it the latter context-switches * away (because it is waiting for I/O) and the async manager is idle * otherwise. * * Optionally, a TCancellation instance can be specified. If present, * triggering it will be considered a request to cancel the work item, if it * is still waiting for the associated transport to become available. * Delegates which are already being processed (i.e. waiting for I/O) are not * affected because this would bring the connection into an undefined state * (as probably half-written request or a half-read response would be left * behind). * * Params: * transport = The TAsyncTransport the work delegate will operate on. Must * be associated with this TAsyncManager instance. * work = The operations to execute on the given transport. Must never * throw, errors should be handled in another way. nothrow semantics are * difficult to enforce in combination with fibres though, so currently * exceptions are just swallowed by TAsyncManager implementations. * cancellation = If set, can be used to request cancellatinon of this work * item if it is still waiting to be executed. * * Note: The work item will likely be executed in a different thread, so make * sure the code it relies on is thread-safe. An exception are the async * transports themselves, to which access is serialized as noted above. */ void execute(TAsyncTransport transport, void delegate() work, TCancellation cancellation = null ) in { assert(transport.asyncManager is this, "The given transport must be associated with this TAsyncManager."); } /** * Submits a delegate to be executed after a certain amount of time has * passed. * * The actual amount of time elapsed can be higher if the async manager * instance is busy and thus should not be relied on. The * * Params: * duration = The amount of time to wait before starting to execute the * work delegate. * work = The code to execute after the specified amount of time has passed. * * Example: * --- * // A very basic example – usually, the actuall work item would enqueue * // some async transport operation. * auto asyncMangager = someAsyncManager(); * * TFuture!int calculate() { * // Create a promise and asynchronously set its value after three * // seconds have passed. * auto promise = new TPromise!int; * asyncManager.delay(dur!"seconds"(3), { * promise.succeed(42); * }); * * // Immediately return it to the caller. * return promise; * } * * // This will wait until the result is available and then print it. * writeln(calculate().waitGet()); * --- */ void delay(Duration duration, void delegate() work); /** * Shuts down all background threads or other facilities that might have * been started in order to execute work items. This function is typically * called during program shutdown. * * If there are still tasks to be executed when the timeout expires, any * currently executed work items will never receive any notifications * for async transports managed by this instance, queued work items will * be silently dropped, and implementations are allowed to leak resources. * * Params: * waitFinishTimeout = If positive, waits for all work items to be * finished for the specified amount of time, if negative, waits for * completion without ever timing out, if zero, immediately shuts down * the background facilities. */ bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1)); } /** * A TTransport which uses a TAsyncManager to schedule non-blocking operations. * * The actual type of device is not specified; typically, implementations will * depend on an interface derived from TAsyncManager to be notified of changes * in the transport state. * * The peeking, reading, writing and flushing methods must always be called * from within the associated async manager. */ interface TAsyncTransport : TTransport { /** * The TAsyncManager associated with this transport. */ TAsyncManager asyncManager() @property; } /** * A TAsyncManager providing notificiations for socket events. */ interface TAsyncSocketManager : TAsyncManager { /** * Adds a listener that is triggered once when an event of the specified type * occurs, and removed afterwards. * * Params: * socket = The socket to listen for events at. * eventType = The type of the event to listen for. * timeout = The period of time after which the listener will be called * with TAsyncEventReason.TIMED_OUT if no event happened. * listener = The delegate to call when an event happened. */ void addOneshotListener(Socket socket, TAsyncEventType eventType, Duration timeout, TSocketEventListener listener); /// Ditto void addOneshotListener(Socket socket, TAsyncEventType eventType, TSocketEventListener listener); } /** * Types of events that can happen for an asynchronous transport. */ enum TAsyncEventType { READ, /// New data became available to read. WRITE /// The transport became ready to be written to. } /** * The type of the delegates used to register socket event handlers. */ alias void delegate(TAsyncEventReason callReason) TSocketEventListener; /** * The reason a listener was called. */ enum TAsyncEventReason : byte { NORMAL, /// The event listened for was triggered normally. TIMED_OUT /// A timeout for the event was set, and it expired. }