Building Async Coordination Primitives, Part 4: AsyncBarrier

Stephen Toub - MSFT

Last time, we looked at building an AsyncCountdownEvent.  At the end of the post, I highlighted a common pattern for using such a type, which is for all of the participants to signal and then wait for all of the other participants to signal as well.  This kind of synchronization is typically referred to as a “barrier,” and often it can be used in a round.  Each participant will do some work, then signal-and-wait for the barrier, then do the next round of work, then signal-and-wait for the barrier, and so on.

Just as we built an AsyncCountdownEvent, we can easily build an AsyncBarrier.  Here’s the shape of our type:

public class AsyncBarrier
{
    public AsyncBarrier(int participantCount);
    public Task SignalAndWait();
}

To start, we need a few members.  As with AsyncCountdown, we need to know how many participants there are.  However, with AsyncCountdownEvent we didn’t need to store that number permanently… we only needed to know how many signals were remaining.  With AsyncBarrier, every time we get down to 0, we’ll need to reset back to the original number of participants, so we need to store both the participant count and the remaining number of participants that still need to signal.  We also need a TaskCompletionSource<bool> to create the task that all of the participants will wait on.

private readonly int m_participantCount;
private TaskCompletionSource<bool> m_tcs = new TaskCompletionSource<bool>();
private int m_remainingParticipants;

Our constructor will simply initialize the counts based on the user-supplied number of participants we expect:

public AsyncBarrier(int participantCount)
{
    if (participantCount <= 0) throw new ArgumentOutOfRangeException(“participantCount”);
    m_remainingParticipants = m_participantCount = participantCount;
}

Finally, our SignalAndWait method is very similar to that of AsyncCountdown, in that we decrement the number of remaining participants, and when the count reaches 0, we complete the TaskCompletionSource<bool>.  However, here we need to be careful about the order in which we do that.  Once a participant wakes up from waiting, they are likely to do the work for their next round of processing and then signal-and-wait on the barrier again, which means that by that time it needs to have already been reconfigured for the next round.  That means that after our count hits 0 but before we complete the task so as to wake the waiters, we need to reset the remaining count back up to the participant count, and we need to swap in a new TaskCompletionSource<bool> to be used for the next round (this example class does not handle the more difficult error case where more participants than excepted join the fray):

public Task SignalAndWait()
{
    var tcs = m_tcs;
    if (Interlocked.Decrement(ref m_remainingParticipants) == 0)
    {
        m_remainingParticipants = m_participantCount;
        m_tcs = new TaskCompletionSource<bool>();
        tcs.SetResult(true);
    }
    return tcs.Task;
}

That’s it.  Of course, there are other ways we could implement this.  One potential problem with the above design could arise if all of the participants chose to use synchronous continuations, as the continuations would then all end up getting serialized.  Dealing with that could just be left up to the participants, or we could help them by implementing this so that each participant got their own Task, with us completing each task in parallel.  That might look instead like the following:

public class AsyncBarrier
{
    private readonly int m_participantCount;
    private int m_remainingParticipants;
    private ConcurrentStack<TaskCompletionSource<bool>> m_waiters;

    public AsyncBarrier(int participantCount)
    {
        if (participantCount <= 0) throw new ArgumentOutOfRangeException(“participantCount”);
        m_remainingParticipants = m_participantCount = participantCount;
        m_waiters = new ConcurrentStack<TaskCompletionSource<bool>>();
    }

    public Task SignalAndWait()
    {
        var tcs = new TaskCompletionSource<bool>();
        m_waiters.Push(tcs);
        if (Interlocked.Decrement(ref m_remainingParticipants) == 0)
        {
            m_remainingParticipants = m_participantCount;
            var waiters = m_waiters;
            m_waiters = new ConcurrentStack<TaskCompletionSource<bool>>();
            Parallel.ForEach(waiters, w => w.SetResult(true));
        }
        return tcs.Task;
    }
}

Next time, we’ll look at building an asynchronous semaphore.

0 comments

Discussion is closed.

Feedback usabilla icon