June 27th, 2011

Using Tasks to implement the APM Pattern

Stephen Toub - MSFT
Partner Software Engineer

Several times recently, folks have asked how to use tasks to implement the APM pattern, otherwise known as the Asynchronous Programming Model pattern, or the IAsyncResult pattern, or the Begin/End pattern.  While moving forward we encourage folks to use a Task-based pattern for exposing asynchronous operation, the APM pattern has been the prevalent pattern for asynchrony since the .NET Framework’s initial release, and thus there is a fair amount of infrastructure today built up around consuming implementations of the APM pattern.  Given how common Task-based APIs will be moving forward, it makes sense then that developers want to implement the APM pattern with Tasks  In this post we examine how to do so in .NET 4.

Consider that we have a method which returns a Task<int>:

static Task<int> FooAsync();

and we want to wrap it in an APM implementation with the signatures:

static IAsyncResult BeginFoo(AsyncCallback callback, object state);
static int EndFoo(IAsyncResult asyncResult);

Implementing BeginFoo is the most interesting piece, so we’ll start there. The hardest part of implementing the begin method is implementing the IAsyncResult interface.  Luckily, Task already does this, so the hardest part is out of the way.  The basic additional semantics we need to encode are:

  1. When the task completes, invoke the provided AsyncCallback.
  2. Pass to the invoked AsyncCallback an IAsyncResult, the same IAsyncResult that’s returned from BeginFoo.
  3. Ensure that the IAsyncResult’s AsyncState property returns the supplied state object.

Notice that none of these three requirements is specific to BeginFoo, but instead are general enough that they can be applied to any Task<TResult> for any BeginXx method, so we’ll build this logic into a reusable ToApm extension method:

static Task<TResult> ToApm<TResult>(this Task<TResult> task, AsyncCallback callback, object state);

Let’s start with #3 above.  We need a Task<TResult> object that has the supplied state as the task’s IAsyncResult’s implementation’s AsyncState.  Task enables you to configure this value when the task is initially constructed, but after that time the task is meant to be observationally pure and thus the IAsyncResult.AsyncState can’t be changed.  As such, we’ll create a new task to represent the operation, using the TaskCompletionSource<TResult> type from the System.Threading.Tasks namespace. TaskCompletionSource serves two purposes: to create the task instance, and to provide the methods necessary to manually complete that task instance in one of the three final states.

static Task<TResult> ToApm<TResult>(this Task<TResult> task, AsyncCallback callback, object state)
{
    var tcs = new TaskCompletionSource<TResult>(state);
 
   …
    return tcs.Task;
}

We need this newly created task to mirror the results of the supplied task, which means that when the supplied task completes, we need to transfer its completion state over to this newly created task:

static Task<TResult> ToApm<TResult>(this Task<TResult> task, AsyncCallback callback, object state)
{
    var tcs = new TaskCompletionSource<TResult>(state); 

    task.ContinueWith(delegate
    {
        if (task.IsFaulted) tcs.TrySetException(task.Exception.InnerExceptions);
        else if (task.IsCanceled) tcs.TrySetCanceled();
        else tcs.TrySetResult(task.Result);
    }, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default); 

    return tcs.Task;
}

Now, for #1 and #2 above, when the task completes, we also need to invoke the callback, passing in the IAsyncResult.  We already have a continuation in place, so this part is easy:

static Task<TResult> ToApm<TResult>(this Task<TResult> task, AsyncCallback callback, object state)
{
    var tcs = new TaskCompletionSource<TResult>(state);

    task.ContinueWith(delegate
    {
        if (task.IsFaulted) tcs.TrySetException(task.Exception.InnerExceptions);
        else if (task.IsCanceled) tcs.TrySetCanceled();
        else tcs.TrySetResult(task.Result);

        if (callback != null) callback(tcs.Task);

    }, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default);

    return tcs.Task;
}

And that’s it!  We can now easily implement our BeginFoo method:

static IAsyncResult BeginFoo(AsyncCallback callback, object state)
{
    return FooAsync().ToApm(callback, state);
}

For our EndFoo method, we simply need to cast the supplied IAsyncResult back into a Task<TResult> and then wait for it to complete and return its result when it’s completed.  Task<TResult>’s Result property handles all of that, as well as throwing an exception if the task completes due to a failure or due to cancellation, so we can write the EndFoo method simply as:

static int EndFoo(IAsyncResult asyncResult)
{
    return ((Task<int>)asyncResult).Result;
}

Of course there’s more we could do here in terms of error conditions (e.g. throwing a better exception than an InvalidCastException if the supplied asyncResult wasn’t a Task<int>), but our basic implementation is complete.

There are variations on these that we could employ as well.  For example, the EndFoo method will end up propagating an AggregateException that wraps the one or more exceptions provided by the Task<int> if the task completed due to faulting or cancellation.  Propagating an AggregateException serves two purposes: first and foremost, it avoids overwriting the stack trace and Watson bucket information by throwing the original exception, and second it enables more than one exception from the task to be propagated.  However, if you choose to propagate the original exception, that can certainly still be done, e.g.

static int EndFoo(IAsyncResult asyncResult)
{
    try
    {

        return ((Task<int>)asyncResult).Result;
    }
    catch(AggregateException ae) { throw ae.InnerException; }
}

There are also further optimizations we could employ for certain use cases.  For example, if the object state provided by the caller is the same state that stored in the supplied task (e.g. if the task was created with an overload of Task.Factory.StartNew that accepts object state), then there’s no reason to create a new task.  And in that case, if there is no callback provided, there’s no reason to have a continuation at all:

static Task<TResult> ToApm<TResult>(this Task<TResult> task, AsyncCallback callback, object state)
{
    if (task.AsyncState == state)
    {
        if (callback != null)
        {
            task.ContinueWith(delegate { callback(task); },
                CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default);
        }
        return task;
    }

    var tcs = new TaskCompletionSource<TResult>(state);
    task.ContinueWith(delegate
    {
        if (task.IsFaulted) tcs.TrySetException(task.Exception.InnerExceptions);
        else if (task.IsCanceled) tcs.TrySetCanceled();
        else tcs.TrySetResult(task.Result);

        if (callback != null) callback(tcs.Task);

    }, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default); 
    return tcs.Task;
}

With this ToApm function in hand, you can now easily convert your Task-based APIs into APM APIs in order to support integration with consumers of the old pattern.

Author

Stephen Toub - MSFT
Partner Software Engineer

Stephen Toub is a developer on the .NET team at Microsoft.

0 comments

Discussion are closed.