ParallelExtensionsExtras Tour – #1 – LINQ to Tasks

Stephen Toub - MSFT

(The full set of ParallelExtensionsExtras Tour posts is available here.) 

The .NET Framework developer center provides a concise description of Language Integrated Query (LINQ):

LINQ is a set of extensions to the .NET Framework that encompass language-integrated query, set, and transform operations. It extends C# and Visual Basic with native language syntax for queries and provides class libraries to take advantage of these capabilities.

These “operations” are more of a pattern than anything else, in that there isn’t just one implementation of them.  Rather, any framework that wishes to follow the structure set forth by LINQ can do so, implementing the operators in a manner that befits the nature of the framework.  There is a primary implementation of these LINQ operators against any IEnumerable<T> (an implementation typically referred to as LINQ to Objects), but that’s just the beginning.  There’s a parallelized implementation of LINQ to Objects in the form of Parallel LINQ to Objects, or PLINQ.  There’s an implementation that targets SQL Server databases, known as LINQ to SQL.  There’s an implementation that targets IObservable<T> known as Reactive Extensions.  And many more.  What’s more, the C# and Visual Basic language syntax for these queries targets the aforementioned pattern rather than a specific implementation of that pattern, and as a result, it works with most of these providers in an easily pluggable manner.

ParallelExtensionsExtras includes an implementation of a subset of the LINQ operators.  This implementation targets System.Threading.Tasks.Task instances rather than enumerables or observables or whatever other source.  In doing so, it enables usage of the language syntax for writing asynchronous code, in very much the same manner that Reactive Extensions does, albeit in a much more limited form.

Let’s start by looking at a very simple query against enumerables:

IEnumerable<string> result = from item in enumerable

                             select item.ToString();

 

The C# compiler compiles this query to the equivalent of the following:

IEnumerable<string> result = enumerable.Select<int,string>(

    (int item) => return item.ToString());

 

This code utilizes the LINQ Select method, and specifically the following override:

public static IEnumerable<TResult> Select<TSource, TResult>(

    this IEnumerable<TSource> source,

    Func<TSource, TResult> selector);

 

This is the specific implementation of the select pattern for IEnumerable<T>, but any type can be substituted in for it (for the “X” in the below signature):

public static X<TResult> Select<TSource, TResult>(

    this X<TSource> source,

    Func<TSource, TResult> selector);

 

As long as a type has an accessible method (or extension method) that fits this pattern, the C# compiler may bind to it.  So, we can substitute “Task” for “X”, giving us:

public static Task<TResult> Select<TSource, TResult>(

    this Task<TSource> source,

    Func<TSource, TResult> selector);

 

Now we just need to figure out how to implement this method.  Given a Task<TSource> source, we need to get its result of type TSource, run it through a selector function to produce a TResult, and a return a new Task<TResult> that will provide the resulting value.  Ideally, we’d do this in an asynchronous manner, such that the resulting Task<TResult> would represent the asynchronous processing of the selector function with the result of the input Task<TSource> once it had completed.  As it turns out, this is trivial to do with Task’s ContinueWith method:

public static Task<TResult> Select<TSource, TResult>(

    this Task<TSource> source,

    Func<TSource, TResult> selector)

{

    return source.ContinueWith(t => selector(t.Result));

}

 

By implementing just that one extension method, we can now successful write the following C# code:

Task<string> result = from x in Task.Factory.StartNew(

                          () => ProduceInt())

                      select x.ToString();

 

That alone isn’t all that interesting.  What if I wanted to chain multiple tasks together, for example:

Task<string> result = from x in Task.Factory.StartNew(

                          () => ProduceInt())

                      from y in Task.Factory.StartNew(

                          () => Process(x))

                      select y.ToString();

 

Now I’m looking to asynchronous execute ProduceInt() to give me an integral value x.  Then I want to asynchronously process x and produce a new value y, at which point I want to asynchronously process y by getting its string representation, and I want a resulting Task<string> to represent that.  This pattern of multiple from clauses is handled by the C# compiler with the SelectMany operator.  Here is the relevant signature of SelectMany for enumerables:

public static IEnumerable<TResult> SelectMany

        <TSource, TCollection, TResult>(

    this IEnumerable<TSource> source,

    Func<TSource, IEnumerable<TCollection>> collectionSelector,

    Func<TSource, TCollection, TResult> resultSelector);

 

Given a source, this method runs the collectionSelector function for each item in the source, producing an enumerable for each item.  That item and its resulting enumerable are then provided to a resultSelector function, which produces the final result that’s yielded as part of SelectMany’s output enumerable.  The same thing is possible for Task, again by substituting “Task” anywhere we see “IEnumerable”:

public static Task<TResult> SelectMany

        <TSource, TCollection, TResult>(

    this Task<TSource> source,

    Func<TSource, Task<TCollection>> collectionSelector,

    Func<TSource, TCollection, TResult> resultSelector);

 

This method is a bit more complicated to implement, but it is still just a few lines of code by taking advantage of ContinueWith:

public static Task<TResult> SelectMany

        <TSource, TCollection, TResult>(

    this Task<TSource> source,

    Func<TSource, Task<TCollection>> collectionSelector,

    Func<TSource, TCollection, TResult> resultSelector)

{

    return source.ContinueWith(t =>

    {

        Task<TCollection> ct = collectionSelector(t.Result);

        return ct.ContinueWith(_ =>

            resultSelector(t.Result, ct.Result));

    }).Unwrap();

}

 

First, we continue from the source to signal us when it’s completed.  At that point, we run the collectionSelector with the source’s result in order to produce the intermediate Task<Collection>.  When that intermediate task has completed (which we again know by using ContinueWith), we run the result selector over the two previous results to produce the final value.  Of course, this result of the second ContinueWith call (which produces a Task<TResult>) is being returned as a the result of the first ContinueWith call, such that the outer ContinueWith is actually returning a Task<Task<TResult>>.  But SelectMany needs to return just a Task<TResult>.  The Unwrap method provided by the Task Parallel Library handles this conversion, providing a Task<TResult> that asynchronously represents the combination of the outer and inner tasks from the Task<Task<TResult>>. With that in place, we can successfully write the desired query with two from clauses.

I’ve omitted some details from these implementations, but you get the basic jist.  The LinqToTasks.cs file in ParallelExtensionsExtras provides a set of more complete implementations, covering Select, SelectMany, Where, Join, GroupJoin, GroupBy, OrderBy, and more.  How useful this LINQ implementation is in practice is arguable, but at the very least it provides for an interesting thought exercise as well as a set of examples for doing some complicated asynchronous logic with ContinueWith.

Enjoy!

0 comments

Discussion is closed.

Feedback usabilla icon