June 30th, 2009

Asynchronous methods, C# iterators, and Tasks

Stephen Toub - MSFT
Partner Software Engineer

More and more, developers are realizing the significant scalability advantages that asynchronous programming can provide, especially as it relates to I/O.

Consider an application that needs to copy data from one stream to another stream, such as is being done in the following synchronous implementation:

static void CopyStreamToStream(Stream input, Stream output)
{
    // Buffer space for the data to be read and written
    byte [] buffer = new byte[0x2000];

    // While there’s data to be read and written
    while(true)
    {
        // Read data. If we weren’t able to read any, bail.
        // Otherwise, write it out and start over again.
        int numRead = input.Read(buffer, 0, buffer.Length);
        if (numRead == 0) break;
        output.Write(buffer, 0, numRead);
    }
}

In many cases like this, such as if the Streams are FileStream instances representing files on disk, or NetworkStream instances representing remote data, the operations being performed require little-to-no computational power, as the majority of the time is spent waiting on the I/O subsystems and devices. If one such operation is being performed at a time, that waiting isn’t such a big deal. But in a synchronous implementation like the one above, that waiting ends up blocking a thread, rendering the thread useless to do anything else while waiting. Threads by default take up a sizeable chunk of memory as well as kernel resources. Thus, if multiple concurrent calls to CopyStreamToStream are executed, multiple threads may be wasted. Since threads consume a non-negligable amount of resources, we try to limit the number of threads in an application at any one time, such as by using a thread pool, and this synchronous style of programming I/O can lead to scalability bottlenecks, especially in server components where we desire to process as many user requests concurrently as the machine’s resources will possibly allow.

One solution to this problem is through compiler support. A compiler could recognize this synchronous pattern and translate it into an asynchronous one. Such a transformation of that same snippet might look like the following (note that this is hand-generated and is not the actual output of any particular compiler, and rather is my attempt to write this out concisely while still human-understandable):

static void CopyStreamToStreamAsync(
    Stream input, Stream output, Action<Exception> completed)
{
    // Buffer space for the data to be read and written
    byte[] buffer = new byte[0x2000];

    // The read/write loop. The parameter is the IAsyncResult of the
    // last read operation that still need to be completed with a call
    // to EndRead. If the parameter is null, that means a new read needs
    // to be started.
    Action<IAsyncResult> readWriteLoop = null;
    readWriteLoop = iar =>
    {
        try
        {
            // Determine whether to start with a BeginRead or 
            // an EndRead/BeginWrite, based on whether iar is null.
            // Then, as long as the loop continues, alternate between
            // reading and writing.
            for (bool isRead = iar == null; ; isRead = !isRead)
            {
                switch (isRead)
                {
                    // Do BeginRead(…)
                    case true:
                        // Start the asynchronous read
                        iar = input.BeginRead(buffer, 0, buffer.Length, readResult =>
                        {
                            // If the read completed synchronously, immediately
                            // return from the callback, as the processing of the
                            // read will be handled synchronously
                            // from the same thread that called BeginRead
                            if (readResult.CompletedSynchronously) return;

                            // The read completed asynchronously, so we need 
                            // to run the processing loop,
                            // starting with EndRead/BeginWrite.
                            readWriteLoop(readResult);
                        }, null);

                        // If the read is completing asynchronously, bail, as
                        // there’s nothing more to do.
                        // If it completed synchronously, loop around to do
                        // the EndRead/BeginWrite synchronously.
                        if (!iar.CompletedSynchronously) return;
                        break;

                   // Do BeginWrite(…)
                   case false:
                       // Complete the previous read. If there’s no more data
                       // to be read/written, bail.
                       int numRead = input.EndRead(iar);
                       if (numRead == 0)
                       {
                           completed(null);
                           return;
                       }

                       // Now that we know how much data was read, write it
                       // out asynchronously
                       iar = output.BeginWrite(buffer, 0, numRead, writeResult =>
                       {
                           // If the write completed synchronously, allow
                           // the thread that called BeginWrite
                           // to handle it.
                           if (writeResult.CompletedSynchronously) return;

                           // Otherwise, complete the asynchronous write
                           // and launch the read/write loop
                           // to continue all over again.
                           output.EndWrite(writeResult);
                           readWriteLoop(null);
                       }, null);

                       // If the write is completing asynchronously, bail,
                       // as there’s nothing more to do.
                       // Otherwise, complete the write synchronously and
                       // loop around.
                       if (!iar.CompletedSynchronously) return;
                       output.EndWrite(iar);
                       break;
                   }
               }
           } catch(Exception e) { completed(e); }
    };

    // Start the whole process off with a read.
    readWriteLoop(null);
}

The Axum compiler, available on DevLabs, is actually capable of these kinds of transformations for asynchronous programming, and you could imagine such functionality being baked into a mainstream language like C#. Here’s how the code could be written asynchronously with Axum:

static asynchronous void CopyStreamToStream(Stream input, Stream output)
{
    byte [] buffer = new byte[0x2000];
    while(true)
    {
        int numRead = input.Read(buffer, 0, buffer.Length);
        if (numRead == 0) break;
        output.Write(buffer, 0, numRead);
    }
}

That’s quite lovely. Comparatively, the hand-written code is mind-numbing, and it’s not something you want to have to write each and every time you need to perform some kind of repeating operation like this asynchronously.

As such, some developers have started to take advantage of C# iterators for writing asynchronous code. While not originally designed for this purpose, the compiler transformations employed for C# iterators are similar to what’s necessary for writing asynchronous code, and thus with a bit of library-based support, it’s possible to write an iterator that looks sequential but that takes advantage of asynchrony. Several libraries have been based on this approach, including the Concurrency & Coordination Runtime (CCR) from Microsoft Robotics, Jeffrey Richter’s AsyncEnumerator, and others.

The key to taking advantage of this pattern is yielding something from an iterator that can invoke a callback when an operation completes. The pattern then becomes:

IEnumerable<ThingThatHasCallbackWhenCompletes> AsyncMethod()
{
    …
    yield return SomethingThatReturnsThingThatHasCallbackWhenCompletes();
    … // code here executes when the 
        // yielded ThingThatHasCallbackWhenCompletes completes
}

The idea is that the iterator method returns an IEnumerable of instances that represent a piece of an asynchronous operation. A utility function is used to invoke the iterator method, and iterates over the resulting enumerator. Each time the utility function gets the next instance from the enumerator, it registers some code to monitor the operation for completion, and when the operation completes, it moves next on the enumerator. Moving next on the enumerator results in re-entering the iterator method at the code location after the last yield point, thus allowing another asynchronous operation to be yielded. In this fashion, the iterator method can in effect yield asynchronous operations, and by using “yield return” to instrument the code with those async points, you as the developer can write an asynchronous method in a manner that looks largely sequential.

Now, think about the description above for the kind of object that needs to be yielded: “something that can invoke a callback when an operation completes.” Sound familiar? The System.Threading.Tasks.Task class in .NET 4 provides this exact functionality. A Task represents an asynchronous operation, and it has a ContinueWith method that enables a callback to be invoked when that asynchronous operation completes. Thus, we should be able to yield Task instances from an iterator in order to write an asynchronous method. Here’s the same CopyStreamToStream example implemented asynchronously in this fashion:

static IEnumerable<Task> CopyStreamToStreamAsync(
    Stream input, Stream output)
{
    // Buffer space for the data to be read and written
    byte [] buffer = new byte[0x2000];

    // While there’s data to be read and written
    while(true)
    {
        // Read data asynchronously. When the operation completes,
        // if no data could be read, we’re done.
        var read = Task<int>.Factory.FromAsync(
            input.BeginRead, input.EndRead, buffer, 0, buffer.Length, null,
            TaskCreationOptions.DetachedFromParent);
        yield return read;
        if (read.Result == 0) break;

        // Write the data asynchronously
        yield return Task.Factory.FromAsync(
            output.BeginWrite, output.EndWrite, buffer, 0, read.Result, null,
            TaskCreationOptions.DetachedFromParent);
    }
}

Much simpler. Where we were previously doing synchronous reads and writes, now we’re yielding the result of calling the built-in Task.Factory.FromAsync method, which creates Tasks that represent asynchronous reads and writes following the APM pattern. We could of course simplify this code further by using a few helper extension methods to hide some of the asynchronous details (these helpers are part of the Beta 1 samples at https://code.msdn.microsoft.com/ParExtSamples):

public static Task<int> ReadTask(this Stream stream,
    byte [] buffer, int offset, int count)
{
    return Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead,
        buffer, offset, count, null, TaskCreationOptions.DetachedFromParent);
}

public static Task WriteTask(this Stream stream,
    byte [] buffer, int offset, int count)
{
    return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite,
        buffer, offset, count, null, TaskCreationOptions.DetachedFromParent);
}

This then enables the previous code to be simplified to:

static IEnumerable<Task> CopyStreamToStreamAsync(
    Stream input, Stream output)
{
    byte [] buffer = new byte[0x2000];
    while(true)
    {
        var read = input.ReadTask(buffer, 0, buffer.Length);
        yield return read;
        if (read.Result == 0) break;
        yield return output.WriteTask(buffer, 0, read.Result);
    }
}

That looks a lot like the synchronous version, and is *much* easier and less error-prone to write than the manual version shown earlier.

Of course, now we need a mechanism for iterating over the asynchronous iterator. As mentioned, we can take advantage of ContinueWith for the main body of the operation (as with the earlier helpers, the following method and several variants of it are available in the Beta 1 samples):

public static Task Iterate(this TaskFactory factory,
    IEnumerable<Task> asyncIterator)
{
    // Validate parameters
    if (factory == null) throw new ArgumentNullException(“factory”);
    if (asyncIterator == null)
        throw new ArgumentNullException(“asyncIterator”);

    // Get the scheduler to use, either the one provided by the factory
    // or the current one if the factory didn’t have one specified.
    var scheduler = factory.TaskScheduler ?? TaskScheduler.Current;

    // Get an enumerator from the enumerable
    var enumerator = asyncIterator.GetEnumerator();
    if (enumerator == null) throw new InvalidOperationException();

    // Create the task to be returned to the caller. And ensure
    // that when everything is done, the enumerator is cleaned up.
    var trs = new TaskCompletionSource<object>(factory.CreationOptions);
    trs.Task.ContinueWith(_ => enumerator.Dispose(),
        TaskContinuationOptions.DetachedFromParent, scheduler);

    // This will be called every time more work can be done.
    Action<Task> recursiveBody = null;
    recursiveBody = antecedent =>
    {
        try
        {
            // If the previous task completed with any exceptions, bail
            if (antecedent != null && antecedent.IsFaulted)
                trs.TrySetException(antecedent.Exception);

            // If the user requested cancellation, bail.
            else if (trs.Task.IsCancellationRequested) trs.TrySetCanceled();

            // If we should continue iterating and there’s more to iterate
            // over, create a continuation to continue processing. We only
            // want to continue processing once the current Task (as yielded
            // from the enumerator) is complete.
            else if (enumerator.MoveNext())
                enumerator.Current.ContinueWith(recursiveBody,
                    TaskContinuationOptions.DetachedFromParent, scheduler).
                        IgnoreExceptions();

            // Otherwise, we’re done!
            else trs.TrySetResult(null);
        }
        // If MoveNext throws an exception, propagate that to the user
        catch (Exception exc) { trs.TrySetException(exc); }
    };

    // Get things started by launching the first task
    factory.StartNew(() => recursiveBody(null),
        TaskCreationOptions.DetachedFromParent, scheduler).
            IgnoreExceptions();

    // Return the representative task to the user
    return trs.Task;
}

Using this implementation, we can now run “asynchronous methods” that return IEnumerable<Task>, as did our CopyStreamToStreamAsync method:

var asyncOperation = Task.Factory.Iterate(
    CopyStreamToStreamAsync(input, output));

Note that the Iterate implementation shown includes a few handy additions on top of the previous hand-coded solution. First, the Iterate method returns a Task, which can be used to track the entire asynchronous operation. Second, it supports cancellation, meaning a caller can request that the asynchronous iteration to shutdown early, even if it hasn’t completed yet. Third, we’ve now separated out the run logic into a separate method, which means that we no longer need all of that goop in the actual target asynchronous method.

On top of all that, this implementation is now based on TaskFactory, which means we can do things like ensure that the code runs on a certain scheduler, such as a scheduler that targets the UI. As an example of where that is handy, consider a method that asynchronously reads from a long, remote stream and stores the resulting data into a TextBox as it’s available:

static IEnumerable<Task> ReadStreamIntoTextBox(Stream stream)
{
    byte [] buffer = new byte[0x2000];
    Encoding enc = new UTF8Encoder();
    while(true)
    {
        var read = stream.ReadTask(buffer, 0, buffer.Length);
        yield return read;
        if (read.Result == 0) break;
        myTextBox.Text += enc.GetString(buffer, 0, read.Result)
    }
}

As previously shown, I could invoke this method as follows:

public void button1_Click(object sender, EventArgs e)
{
    Task.Factory.Iterate(ReadStreamIntoTextBox(inputStream)); // buggy
}

However, accessing myTextBox.Text from a thread other than the thread that created myTextBox is a no-no, and yet that’s potentially what will happen in the above. To address that, I want to ensure that the actual code from the iterator is executed on the UI thread (but I still don’t want the asynchronous operations to block the UI thread). To accomplish that, I can create a TaskFactory that will run tasks on the UI thread, as I do in the following code:

public void button1_Click(object sender, EventArgs e)
{
    var uiFactory = new TaskFactory(
        TaskScheduler.FromCurrentSynchronizationContext());
    uiFactory.Iterate(ReadStreamIntoTextBox(inputStream));
}

Of course, while just being able to yield individual operations is useful, things get more interesting when you start considering multi-task continuations, as exposed through ContinueWhenAny and ContinueWhenAll. For example, in our previous copy stream example, we’re reading, then writing, then reading, then writing, and so forth. But we should be able to write the previously read bits while reading the next chunk, thereby achieving better speeds by overlapping latencies. Writing the code to do that using manual asynchrony would be a nightmare… with iterators and tasks, it’s manageable, almost fun:

static IEnumerable<Task> CopyStreamToStreamAsync(
    Stream input, Stream output)
{
    byte[][] buffers = new byte[2][] { 
        new byte[BUFFER_SIZE], new byte[BUFFER_SIZE] };
    int filledBufferNum = 0;
    Task writeTask = null;

    while (true)
    {
  &

Author

Stephen Toub - MSFT
Partner Software Engineer

Stephen Toub is a developer on the .NET team at Microsoft.

0 comments

Discussion are closed.