December 15th, 2011

Awaiting Socket Operations

Stephen Toub - MSFT
Partner Software Engineer

The System.Net.Sockets.Socket class in .NET exposes multiple sets of asynchronous methods that perform the same basic operations but that are exposed with different patterns.

The first set follows the APM pattern, where for a synchronous method like Receive, the BeginReceive and EndReceive methods are exposed.  If you want to be able to “await” such asynchronous operations in an async method, the easiest way to do so is to create a Task-based wrapper.  That can be done using Task.Factory.FromAsync, or it can be done more manually by using a TaskCompletionSource<TResult>, e.g.

public static Task<int> ReceiveAsync(
    this Socket socket, byte[] buffer, int offset, int size,
    SocketFlags socketFlags)
{
    var tcs = new TaskCompletionSource<int>();
    socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
    { 
        try { tcs.TrySetResult(socket.EndReceive(iar)); }
        catch (Exception exc) { tcs.TrySetException(exc); }
    }, null);
    return tcs.Task;
}

or with a slightly more efficient implementation that avoids some of the extra allocations here:

public static Task<int> ReceiveAsync(
    this Socket socket, byte[] buffer, int offset, int size,
    SocketFlags socketFlags)
{
    var tcs = new TaskCompletionSource<int>(socket);
    socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
    {
        var t = (TaskCompletionSource<int>)iar.AsyncState;
        var s = (Socket)t.Task.AsyncState;
        try { t.TrySetResult(s.EndReceive(iar)); }
        catch (Exception exc) { t.TrySetException(exc); }
    }, tcs);
    return tcs.Task;
}

For many asynchronous methods, this approach is good enough.  The overhead involved in creating a new TaskCompletionSource<TResult> for each operation is often dwarfed by the costs of the operation itself.

However, there are some scenarios where this is not sufficient, and not only the additional tasks, but the APM implementation itself.  If you’re making thousands upon thousands of socket calls asynchronously per second, that’s thousands upon thousands of IAsyncResult objects getting created.  That’s a lot of pressure on the garbage collector, and can result in unacceptable pauses in some networking-intensive apps.

To address that, Socket exposes another set of asynchronous methods.  These methods look similar to the Event-based Async Pattern (EAP), but they’re subtly different.  Basically, you create a SocketAsyncEventArgs instance, and you configure that instance with a buffer, with a Completion event handler, and so on. Then you pass this instance into methods like ReceiveAsync and  SendAsync.  Those methods return a Boolean indicating whether they completed synchronously, and if they did, then the event args instance is immediately reusable, and the caller is responsible for finishing the operation synchronously (e.g. checking the status of the operation, getting from the SocketAsyncEventArgs instance the number of bytes received, etc.)  If the operation didn’t complete synchronously, then the completion event will be raised when it does complete, and that handler does the same completion logic we would have done synchronously.  While a bit more complicated, the benefit of this pattern is that these SocketAsyncEventArgs instances can be reused, such that once an instance is no longer being used by a first asynchronous call, it may be reused for a second, and then for a third, and so on.  This avoids needing to construct additional objects per async operation, which thereby decreases pressure on the garbage collector.

We could of course wrap such asynchronous methods with Task, as well, but that would largely defeat the purpose of these methods, similar to allocating an IAsyncResult in the APM case.  Still, though, we want to be able to take advantage of the compiler’s async/await support to make it easier to write asynchronous code using sockets.  We want our cake and to eat it, too.

Of course, the compiler supports awaiting more than just Tasks.  So if you have a specialized scenario like this, you can take advantage of the compiler’s pattern-based support for awaiting things.  Below, I’ve implemented an awaitable wrapper for a SocketAsyncEventArgs:

public sealed class SocketAwaitable : INotifyCompletion
{
    private readonly static Action SENTINEL = () => { };

    internal bool m_wasCompleted;
    internal Action m_continuation;
    internal SocketAsyncEventArgs m_eventArgs;

    public SocketAwaitable(SocketAsyncEventArgs eventArgs)
    {
        if (eventArgs == null) throw new ArgumentNullException(“eventArgs”);
        m_eventArgs = eventArgs;
        eventArgs.Completed += delegate
        {
            var prev = m_continuation ?? Interlocked.CompareExchange(
                ref m_continuation, SENTINEL, null);
            if (prev != null) prev();
        };
    }

    internal void Reset()
    {
        m_wasCompleted = false;
        m_continuation = null;
    }

    public SocketAwaitable GetAwaiter() { return this; }

    public bool IsCompleted { get { return m_wasCompleted; } }

    public void OnCompleted(Action continuation)
    {
        if (m_continuation == SENTINEL ||
            Interlocked.CompareExchange(
                ref m_continuation, continuation, null) == SENTINEL)
        {
            Task.Run(continuation);
        }
    }

    public void GetResult()
    {
        if (m_eventArgs.SocketError != SocketError.Success)
            throw new SocketException((int)m_eventArgs.SocketError);
    }
}

With that helper type, I can write a short three-line wrapper method for each of the Socket methods I care about, e.g.

public static class SocketExtensions
{
    public static SocketAwaitable ReceiveAsync(this Socket socket,
        SocketAwaitable awaitable)
    {
        awaitable.Reset();
        if (!socket.ReceiveAsync(awaitable.m_eventArgs))
            awaitable.m_wasCompleted = true;
        return awaitable;
    }

    public static SocketAwaitable SendAsync(this Socket socket,
        SocketAwaitable awaitable)
    {
        awaitable.Reset();
        if (!socket.SendAsync(awaitable.m_eventArgs)) 
            awaitable.m_wasCompleted = true;
        return awaitable;
    }

    // …
}

And with that, I can use async/await with these very efficient asynchronous methods on Socket, e.g.

static async Task ReadAsync(Socket s)
{
    // Reusable SocketAsyncEventArgs and awaitable wrapper
    var args = new SocketAsyncEventArgs();
    args.SetBuffer(new byte[0x1000], 0, 0x1000);
    var awaitable = new SocketAwaitable(args);

    // Do processing, continually receiving from the socket
    while (true)
    {
        await s.ReceiveAsync(awaitable);
        int bytesRead = args.BytesTransferred;
        if (bytesRead <= 0) break;

        Console.WriteLine(bytesRead);
    }
}

Here, when I make a call to my ReceiveAsync extension method, I’m using a reusable SocketAsyncEventArgs that was previously created and wrapped into one of my helper SocketAwaitable types.  ReceiveAsync doesn’t need to do any allocation; it simply calls to the underlying ReceiveAsync on the Socket, passing in the wrapped SocketAsyncEventArgs.  If the operation completes synchronously, we just set the awaitable’s m_wasCompleted to true, such that when the compiler-generated code awaits the returned SocketAwaitable, it sees that IsCompleted is true, and simply proceeds to call GetResult, which will throw an exception if the socket operation failed.  If the underlying ReceiveAsync call does not complete synchronously, then m_wasCompleted will remain false, and it’ll be up to the OnCompleted method to store the await continuation to be run when the SocketAsyncEventArgs’ Completed handler is invoked.  The only time we need to schedule a task here is if the socket operation asynchronously completes between the time that the compiler-generated code checks IsCompleted and then invokes OnCompleted, and that should be rare.

If you wanted to write a bit more code, you could even create a specialized version of SocketAwaitable such that its GetResult method returned the result of the operation, e.g. a ReceiveSocketAwaitable whose GetResult method returned the SocketAsyncEventArgs.BytesTransferred value.  With that, you could then write the above loop almost as if it were synchronous code, e.g.

static async Task ReadAsync(Socket s)
{
    // Reusable SocketAsyncEventArgs and awaitable wrapper
    var args = new SocketAsyncEventArgs();
    args.SetBuffer(new byte[0x1000], 0, 0x1000);
    var awaitable = new ReceiveSocketAwaitable(args);

    // Do processing, continually receiving from the socket
    int bytesRead;
    while ((bytesRead = await s.ReceiveAsync(awaitable)) > 0)
    {
        Console.WriteLine(bytesRead);
    }
}

Enjoy.

Author

Stephen Toub - MSFT
Partner Software Engineer

Stephen Toub is a developer on the .NET team at Microsoft.

0 comments

Discussion are closed.