November 21st, 2010

Processing Sequences of Asynchronous Operations with Tasks

Stephen Toub - MSFT
Partner Software Engineer

Of late, I’ve seen multiple folks asking about how to use tasks to asynchronously execute a sequence of operations.  For example, given three synchronous functions:

public string DoA(string input);
public string DoB(string aResult);
public string DoC(string bResult);

you could invoke these functions with code like:

string aResult = DoA(input);
string bResult = DoB(aResult);
string cResult = DoC(bResult);

Then, given Task-based asynchronous counterparts to these functions:

public static Task<string> DoAAsync(string input);
public static Task<string> DoBAsync(string aResult);
public static Task<string> DoCAsync(string bResult);

how would you asynchronously do the equivalent of the synchronous code previously shown?

Async CTP and Language Support

The Async CTP highlights a great way to handle this once C# and Visual Basic have built-in support for awaiting tasks.  With that language support, the asynchronous version looks almost identical to the synchronous, albeit with a few extra keywords thrown in:

string aResult = await DoAAsync(input);
string bResult = await DoBAsync(aResult);
string cResult = await DoCAsync(bResult);

Lovely! Of course, .NET 4 doesn’t currently support that special syntax out-of-the-box, so what can we do in the meantime?  It’s important to realize that the Async CTP just builds on top of what’s available in .NET 4, so while the compiler here is doing what compilers do best and writing lots of boilerplate so that you don’t have to, the generated code is really still just using existing support in the Task Parallel Library.  We can use that support directly, too.

ContinueWith and Unwrap

One approach is to just use what’s provided in TPL without utilizing any additional helpers.  The Task.ContinueWith method schedules code to run when the antecedent task completes, and returns a Task (or Task<TResult>) to represent that subsequent operation.  So, we can try to handle the first chained call with code like the following:

var aResult = DoAAsync(input);
var bResult = aResult.ContinueWith(t => DoBAsync(t.Result));

This isn’t quite right, however.  This overload of ContinueWith creates a Task<TResult>, but the TResult here will be typed as the result of DoBAsync, which is Task<string>.  Thus, the instance returned from the ContinueWith call will be a Task<Task<string>>, which is not what we want.  To handle those nested tasks, we can utilize the Unwrap method included in TPL to “unwrap” the inner nested task, converting the Task<Task<string>> into the Task<string> we desire.  And with that small change, we can now complete our example:

var aResult = DoAAsync(input);
var bResult = aResult.ContinueWith(t => DoBAsync(t.Result)).Unwrap();
var cResult = bResult.ContinueWith(t => DoCAsync(t.Result)).Unwrap();

This approach works, and it may be all you need, but it does have a couple of downsides.  First, what happens if DoAAsync fails such that the returned task is faulted rather than running to completion?  In that case, the continuation off of aResult will still execute, and accessing the t.Result inside that function will propagate the exception.  That will cause bResult to be faulted, and its continuation will fire, with that continuation’s t.Result propagating the exception, and so on.  Net net, cResult will appropriately be faulted, but more work will have happened than was necessary (e.g. we invoked the continuations for B and C when we didn’t have to, and we threw several more exceptions than we needed to), and each time the exception was propagated it was wrapped in another level of AggregateException, so cResult will contain an AggregateException containing an AggregateException containing… and so on (AggregateException’s Flatten method is great for reducing these extra levels of aggregation, by the way).  Second, it’s a bit more code than you may otherwise like to write.  Can we do better?

Then

One of the really nice things about Tasks is that they enable good composition.  Once you have a single type capable of representing any arbitrary asynchronous operation, you can write “combinators” over the type that allow you to combine/compose asynchronous operations in a myriad of ways.  For example, we can abstract away this sequential combination notion into a Then combinator, and at the same time take care of the previously mentioned exception handling concerns. Our goal here will be to build the following operator:

public static Task<T2> Then<T1, T2>(this Task<T1> first, Func<T1, Task<T2>> next);

With that in place, we can then use it to re-implement our desired functionality with code like:

var aResult = DoAAsync(input);
var bResult = aResult.Then(s => DoBAsync(s));
var cResult = bResult.Then(s => DoCAsync(s));

The behavior of “Then” is that when the first task completes, the next function is invoked to produce a task and is provided with the output from the previous task.  The task returned from Then primarily represents the task returned by the next function’s invocation; however, if the first task is canceled or faults, the next function will not be invoked, and the returned task will instead represent the first task.  With this approach, we can chain Then calls together as was done in the above example, and the final task produced from the chain will represent all of the processing.

To implement Then, we first need the scaffolding to do argument validation and to create the task that Then will return:

public static Task<T2> Then<T1, T2>(this Task<T1> first, Func<T1, Task<T2>> next)
{
    if (first == null) throw new ArgumentNullException("first");
    if (next == null) throw new ArgumentNullException("next");

    var tcs = new TaskCompletionSource<T2>();
    … // TODO #1: implement Then logic
    return tcs.Task;
}

This code validates that the source task and the function to produce the subsequent task are both non-null.  It then creates a TaskCompletionSource<TResult> which is used to return a Task from Then; the rest of our implementation will be all about completing that TCS task with the appropriate state.  Now we just need to fill in that TODO #1:

// in place of TODO #1 above
first.ContinueWith(delegate
{
    if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions); 
    else if (first.IsCanceled) tcs.TrySetCanceled();
    else
    {
        … // TODO #2: handle successful completion of first
    }, TaskContinuationOptions.ExecuteSynchronously);
}

We’re utilizing a continuation off of the first task to run some code when the task completes.  If it completes due to faulting, we transfer its exceptions to the TCS task we’re returning from Then, and we’re done.  Similarly, if the first task was canceled, we cancel the TCS task.  That just leaves a RanToCompletion final state, meaning that the first task completed successfully, and we’ll handle that case in a moment.  The other thing to notice is that we’ve specified that this continuation task should ExecuteSynchronously: that just means that, if possible, this continuation function will run synchronously with regards to the first task completing, ideally happening on the same thread immediately after first completes, rather than TPL scheduling the task to execute later.  Now let’s complete our implementation by finishing TODO #2:

// in place of TODO #2 above
try
{
    var t = next(first.Result);
    if (t == null) tcs.TrySetCanceled();
    else t.ContinueWith(delegate
    {
        if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
        else if (t.IsCanceled) tcs.TrySetCanceled();
        else tcs.TrySetResult(t.Result);
    }, TaskContinuationOptions.ExecuteSynchronously);
}
catch (Exception exc) { tcs.TrySetException(exc); }

Here we invoke the next function with the result of the first task.  If for some reason next returns null, we’ve opted to cancel the TCS task, but this could be changed to fault it or to do something else entirely, based on your desired semantics.  Assuming a non-null Task is returned, we then continue from that task.  This is done to enable transferring of the results from the that task to the TCS task since, as mentioned, the TCS task returned from Then should be a proxy for this inner task object.  The rest should look familiar, simply transferring the final state and associated data from this inner task to the TCS task.  Here’s our final implementation of this method:

public static Task<T2> Then<T1, T2>(this Task<T1> first, Func<T1, Task<T2>> next)
{
    if (first == null) throw new ArgumentNullException("first");
    if (next == null) throw new ArgumentNullException("next");

    var tcs = new TaskCompletionSource<T2>();
    first.ContinueWith(delegate
    {
        if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
        else if (first.IsCanceled) tcs.TrySetCanceled();
        else
        {
            try
            {
                var t = next(first.Result);
                if (t == null) tcs.TrySetCanceled();
                else t.ContinueWith(delegate
                {
                    if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
                    else if (t.IsCanceled) tcs.TrySetCanceled();
                    else tcs.TrySetResult(t.Result);
                }, TaskContinuationOptions.ExecuteSynchronously);
            }
            catch (Exception exc) { tcs.TrySetException(exc); }
        }
    }, TaskContinuationOptions.ExecuteSynchronously);
    return tcs.Task;
}

There are of course many variations on this possible, and you could imagine building up several overloads of Then to handle a multitude of cases for creating chains of asynchronous processing, e.g.

public static Task Then(this Task first, Action next);
public static Task Then(this Task first, Func<Task> next);
public static Task<T2> Then<T2>(this Task first, Func<T2> next);
public static Task<T2> Then<T2>(this Task first, Func<Task<T2>> next);

public static Task Then<T1>(this Task<T1> first, Action<T1> next);
public static Task Then<T1>(this Task<T1> first, Func<T1,Task> next);
public static Task<T2> Then<T1,T2>(this Task<T1> first, Func<T1,T2> next);
public static Task<T2> Then<T1,T2>(this Task<T1> first, Func<T1, Task<T2>> next);

Sequence

With such methods, we can create additional abstractions for sequential processing as well.  Let’s say we did have an implementation for an additional one of the above overloads:

public static Task Then(this Task first, Func<Task> next)
{
    if (first == null) throw new ArgumentNullException("first");
    if (next == null) throw new ArgumentNullException("next");

    var tcs = new TaskCompletionSource<object>();
    first.ContinueWith(delegate
    {
        if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
        else if (first.IsCanceled) tcs.TrySetCanceled();
        else
        {
            try
            {
                var t = next();
                if (t == null) tcs.TrySetCanceled();
                else t.ContinueWith(delegate
                {
                    if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
                    else if (t.IsCanceled) tcs.TrySetCanceled();
                    else tcs.TrySetResult(null);
                }, TaskContinuationOptions.ExecuteSynchronously);
            }
            catch (Exception exc) { tcs.TrySetException(exc); }
        }
    }, TaskContinuationOptions.ExecuteSynchronously);
    return tcs.Task;
}

We can layer a Sequence method on top of this as follows:

public static Task Sequence(params Func<Task> [] actions)
{
    Task last = null;
    foreach (var action in actions)
    {
last = (last == null) ? Task.Factory.StartNew(action).Unwrap() : last.Then(action);
    }
    return last;
}

and with such a Sequence method, we’re now able to write a set of lambdas that will be processed asynchronously one after the other, e.g.

Task<string> aResult = null, bResult = null, cResult = null;
Sequence(

    () => { return aResult = DoAAsync(input); },

    () => { return bResult = DoBAsync(aResult.Result); },

    () => { return cResult = DoCAsync(bResult.Result); });

Iterate

We can build other interesting combinators for sequential processing, including one that takes advantage of C# iterators.  There are several popular frameworks that use C# iterators to drive asynchronous processing, and the same trick employed in those can be used with tasks.  Here’s a basic example of such a driver:

public static Task Iterate(IEnumerable<Task> asyncIterator)
{
    if (asyncIterator == null) throw new ArgumentNullException("asyncIterator");

    var enumerator = asyncIterator.GetEnumerator();
    if (enumerator == null) throw new InvalidOperationException("Invalid enumerable - GetEnumerator returned null");

    var tcs = new TaskCompletionSource<object>();
    tcs.Task.ContinueWith(_ => enumerator.Dispose(), TaskContinuationOptions.ExecuteSynchronously);

    Action<Task> recursiveBody = null;
    recursiveBody = delegate {
        try {
            if (enumerator.MoveNext()) enumerator.Current.ContinueWith(recursiveBody, TaskContinuationOptions.ExecuteSynchronously);
            else tcs.TrySetResult(null);
        }
        catch (Exception exc) { tcs.TrySetException(exc); }
    };

    recursiveBody(null);
    return tcs.Task;
}

This Iterate method accepts an enumerable of tasks.  This enumerable needs to be lazy, such that the next task isn’t generated until MoveNext is called to retrieve it; this is exactly the behavior we’ll generally get with a C# iterator (and a VB iterator in Async CTP, but if you have the Async CTP, you should just use await as called out at the beginning of this post).  The Iterate method retrieves an enumerator and begins by calling MoveNext on it.  The task returned from the enumerator as Current then has a continuation hooked up to it which, when the retrieved task completes, starts that process again, calling MoveNext on the enumerator, hooking up a continuation, and so on.  Only once MoveNext throws an exception or returns false does this process end.  With such an Iterate method in place, we can now rewrite our original sample with a method like this:

IEnumerable<Task> DoExample(string input)
{

    var aResult = DoAAsync(input);

    yield return aResult;

    var bResult = DoBAsync(aResult.Result);

    yield return bResult;

    var cResult = DoCAsync(bResult.Result);

    yield return cResult;

    …

}

which we then invoke with code like:

Task t = Iterate(DoExample(“42”));

There are of course many variations on the Iterate implementation you could easily add.  For example, you might want the iteration to stop if a retrieved task ends in the faulted state; that can be accomplished with some minor modifications to the recursiveBody function.

LINQ

Getting slightly more wacky (but still within the realm of reasonable), we can utilize the C# and VB LINQ syntax to also express sequential processing.  The C# and VB compilers are pattern-based in their support of the .NET standard query operators, allowing you to use from, select, and so on with arbitrary data types as long as methods matching the right signature are exposed.  We can, for example, implement an appropriate SelectMany operator as an extensions method on top of Task<TResult>:

static class Extensions
{
    public static Task<TResult> SelectMany<TSource, TCollection, TResult>(
        this Task<TSource> source,
        Func<TSource, Task<TCollection>> collectionSelector,
        Func<TSource, TCollection, TResult> resultSelector)
    {
        if (source == null) throw new ArgumentNullException("source");
        if (collectionSelector == null) throw new ArgumentNullException("collectionSelector");
        if (resultSelector == null) throw new ArgumentNullException("resultSelector");

        return source.ContinueWith(t =>
        {
            return collectionSelector(t.Result).
                ContinueWith(c => resultSelector(t.Result, c.Result), TaskContinuationOptions.NotOnCanceled);
        }, TaskContinuationOptions.NotOnCanceled).Unwrap();
    }
}

and with that, we can then re-implement our original example with code like:

var result = from aResult in DoAAsync("42")
             from bResult in DoBAsync(aResult)
             from cResult in DoCAsync(bResult)
             select cResult;

An implementation of a bunch of the LINQ operators targeting tasks is available as part of the Parallel Extensions Extras project at https://code.msdn.microsoft.com/ParExtSamples.  If instead we were to utilize the Reactive Extensions available for download from the DevLabs site, we can utilize the ToObservable extension method it provides for Task<TResult>, and then utilize Rx’s robust LINQ implementation to do the same thing, e.g.

var result = from aResult in DoAAsync("42").ToObservable()
             from bResult in DoBAsync(aResult).ToObservable()

             from cResult in DoCAsync(bResult).ToObservable()

             select cResult;

F# Async Workflows

Out-of-the-box in Visual Studio 2010, F# supports asynchronous workflows and has built-in capability for awaiting tasks.  This allows you to write the original example with code like:

let example = async {
    let! aResult = Async.AwaitTask DoAAsync()

    let! bResult = Async.AwaitTask DoBAsync()

    let! cResult = Async.AwaitTask DoCAsync()

    …

}

Summary

As you can see, there are many useful and interesting ways to achieve sequential, asynchronous processing using Tasks.  Eventually when built-in language support is available for awaiting tasks, that will be the easiest and the recommended approach for developing asynchronous code of this nature.  In the meantime, you can try out such support in the Async CTP, and you can use other techniques as described in this blog post to achieve similar kinds of processing today, assured that you’ll be able to update that code in the future with the more powerful language capabilities when they’re available.

Happy coding!

Author

Stephen Toub - MSFT
Partner Software Engineer

Stephen Toub is a developer on the .NET team at Microsoft.

0 comments

Discussion are closed.