August 12th, 2009

Implementing Parallel While with Parallel.ForEach

Stephen Toub - MSFT
Partner Software Engineer

The Parallel class in .NET 4 includes methods that implement three parallel constructs: parallelized for loops (Parallel.For), parallelized foreach loops (Parallel.ForEach), and parallelized statement regions (Parallel.Invoke).  One of the interesting things about Parallel.Invoke is that, in some cases and at least in the current implementation, it may actually delegate internally to Parallel.For.  This highlights that it’s possible to build new constructs in terms of these three building blocks.

One construct that’s not provided out-of-the-box but that we might want is a parallel while loop, the equivalent of:

while(condition()) body();

but executed in parallel, where multiple threads are in effect all executing this loop until condition() returns false.  While it may not be obvious at first glance, we can actually build that with just a few lines of code in terms of Parallel.ForEach.

Here’s the signature I’d like to be able to implement:

public static void While(
    ParallelOptions parallelOptions, Func<bool> condition, Action body);

In a previous post, we demonstrated how C# iterators can be used to feed a Parallel.ForEach to create support for types of loops for which no direct overloads were provided, and we can use that same technique here.  Consider the following iterator:

private static IEnumerable<bool> IterateUntilFalse(Func<bool> condition)
{
    while (condition()) yield return true;
}

This iterator yields an enumerable of Booleans.  The true/false value of the Booleans doesn’t actually matter, and Booleans were just chosen for convenience and simplicity.  What actually matters is how many elements are yielded.  Specifically, each call to MoveNext on the resulting iterator will evaluate condition(), and while condition() returns true, the enumerator will continue yielding values.  Once condition() returns false, the enumerator will be empty.

With that iterator in hand, implementing our parallel While method becomes easy:

public static void While(
    ParallelOptions parallelOptions, Func<bool> condition, Action body)
{
    Parallel.ForEach(IterateUntilFalse(condition), parallelOptions,
        ignored => body());
}

I could even update the signature to support a body that accepts a ParallelLoopState, so that the body of the loop is able to break out early through ParallelLoopState.Stop (see https://blogs.msdn.com/pfxteam/archive/2009/05/27/9645023.aspx for more info on exiting loops early):

public static void While(
    ParallelOptions parallelOptions, Func<bool> condition,
    Action<ParallelLoopState> body)
{
    Parallel.ForEach(IterateUntilFalse(condition), parallelOptions,
        (ignored, loopState) => body(loopState));
}

And so on.

Other variations are also possible, while still taking advantage of the same technique.  For example, the previous snippet invokes the condition() as part of the enumerated data source, and as has been discussed in previous posts, Parallel.ForEach needs to lock around access to the data source as it enumerates elements.  This means then that the condition is being invoked under a lock, and its invocation is serialized across multiple threads.  If the body delegate represents the bulk of the work and the condition delegate is cheap, this is probably fine.  If, however, the condition delegate represents any significant work, this could be problematic.  We can work around that with new code, like the following:

public static void While(
    ParallelOptions parallelOptions, Func<bool> condition,
    Action<ParallelLoopState> body)
{
    Parallel.ForEach(Infinite(), parallelOptions, (ignored, loopState) =>
    {
        if (condition()) body(loopState);
        else loopState.Stop();
    });
}

private static IEnumerable<bool> Infinite()
{
    while (true) yield return true;
}

Here we’re using an enumerable that yields an infinite stream of Booleans, and we’re using it purely to drive an infinite ForEach.  Of course, an infinite ForEach isn’t all that useful, so we rely on the body of the ForEach to check the condition(), and if it returns false, to use the ParallelLoopState.Stop method to cease execution.

We can take this a step further and eliminate the lock used by Parallel.ForEach altogether.  Removing such overheads can help to drive down how much work needs to be done by the condition and body delegates for parallel speedups to be achievable.  How do we eliminate the lock when it’s built into Parallel.ForEach?  By doing the partitioning ourselves.

In addition to overloads for IEnumerable<T>, Parallel.ForEach also has overloads that work with Partitioner<T>, a new type introduced in .NET 4.0 as part of the System.Collections.Concurrent namespace.  We’ll discuss Partitioner<T> more in future posts, but suffice it to say that Partitioner<T> hands out partitions for whatever data source it represents, and Parallel.ForEach’s worker threads simply take those partitions and enumerate them, one per thread.  Since there’s one partition per thread, Parallel.ForEach doesn’t use locks to access the partitions.  Thus, we can write our own “infinite” partitioner, that returns partitions with an infinite number of elements, and use it exactly as we’re using our Infinite() iterator, but with no locking costs.

Here’s the custom partitioner:

public class InfinitePartitioner : Partitioner<bool>
{
    public override IList<IEnumerator<bool>> GetPartitions(int partitionCount)
    {
        if (partitionCount < 1)
            throw new ArgumentOutOfRangeException(“partitionCount”);
        return (from i in Enumerable.Range(0, partitionCount)
                  select InfiniteEnumerator()).ToArray();
    }

    public override bool SupportsDynamicPartitions { get { return true; } }

    public override IEnumerable<bool> GetDynamicPartitions()
    {
        return new InfiniteEnumerators();
    }

    private static IEnumerator<bool> InfiniteEnumerator()
    {
        while (true) yield return true;
    }

    private class InfiniteEnumerators : IEnumerable<bool>
    {
        public IEnumerator<bool> GetEnumerator()
        {
            return InfiniteEnumerator();
        }
        IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
    }
}

To create a new partitioner, you derive a new type from Partitioner<T> and override some methods and properties.  Partitioner<T> provides support for both static partitioning, where the number of partitions is known ahead of time, and dynamic partitioning, where more partitions can be added dynamically.  Implementations of Partitioner<T> must support at least static partitioning, and thus the GetPartitions method is abstract and must be overridden.  GetPartitions accepts the number of partitions to create and returns a list of enumerators, one per partition, that can be enumerated to completely enumerate the data source.  Parallel.ForEach, however, demands dynamic partitioning support, as it will ramp up the number of threads participating in the loop as processing resources become available, and thus it needs to be able to dynamically create more partitions as they’re required.  To support dynamic partitioning, two members need to be overridden.  First, the SupportsDynamicPartitions property should be overridden to return true (the base class’ implementation returns false).  Second, the GetDynamicPartitions method needs to be overridden to return an object that performs the dynamic partitioning.  The object comes in the form of an IEnumerable<T>.  This might seem odd, until you think about the shape of the IEnumerable<T> interface: it provides one method, GetEnumerator, which is used to retrieve an enumerator, and it’s perfectly valid to call GetEnumerator multiple times to retrieve multiple enumerators.  Thus, every thread that partakes in the loop can call GetEnumerator on the returned IEnumerable<T> in order to get an enumerator for that partition.  Kind of slick, if not a bit mind-contorting.

Implementing GetPartitions is easy for our custom partitioner.  We simply use a LINQ query to generate partitionCount enumerators, each of which can be retrieved through a call to InfiniteEnumerator().

Implementing GetDynamicPartitions is only slightly more complex.  We need a class whose GetEnumerator method will return an enumerator appropriate for each partition. Thus, we create the InfiniteEnumerators class, whose GetEnumerator method simply returns the result of a call to InfiniteEnumerator(), and an instance of InfiniteEnumerators is returned from GetDynamicPartitions.

With that Partitioner written, now we can turn our attention back to While’s implementation, which looks almost identical to before:

public static void While(
    ParallelOptions parallelOptions, Func<bool> condition,
    Action<ParallelLoopState> body)
{
    Parallel.ForEach(new InfinitePartitioner(), parallelOptions,
        (ignored, loopState) =>
    {
        if (condition()) body(loopState));
        else loopState.Stop();
    });
}

The only difference between this implementation and the previous While is that we’re passing in an instance of InfinitePartitioner.

Author

Stephen Toub - MSFT
Partner Software Engineer

Stephen Toub is a developer on the .NET team at Microsoft.

2 comments

Discussion is closed. Login to edit/delete existing comments.