ParallelWhileNotEmpty

Stephen Toub - MSFT

Parallel Extensions includes the System.Threading.Parallel class, which provides several high-level loop replacement constructs like For and ForEach. In previous blog posts, we’ve taken a look at implementing other loops, such as for loops with arbitrary initialization, conditional, and update logic, range-based loops, and a parallel while.  There’s another interesting loop construct which some parallel libraries provide, that of a loop which processes elements from a collection until that collection is empty, but which allows the processing of items in the loop to add more items to the collection.  Such a looping construct isn’t included in Parallel Extensions, but you can certainly build one yourself.

First, why would such a “loop” be useful?  There are a variety of scenarios that call for a construct like this.  Consider a case where you have several Tree<T> instances, and you want to process all of the nodes in each of these trees.  This fits the pattern as a tree can produce more nodes to be processed (e.g. its children).

ParallelWhileNotEmpty(treeRoots, (node, adder) =>
{
    foreach(var child in node.Children) adder(child); 
    Process(node);
}

There are a variety of ways such a ParallelWhileNotEmpty loop could be built.  We’ll explore a few in this blog post, but this is by no means an exhaustive list.

First, let’s consider an approach that takes advantage of the parent/child relationships exposed by Tasks.  Tasks implicitly wait for their children to complete (for more examples of waiting, see Waiting for Tasks).  This allows me to write a concise (yet dense) implementation of ParallelWhileNotEmpty that looks like this:

public static void ParallelWhileNotEmpty<T>(
    IEnumerable<T> initialValues, Action<T, Action<T>> body)
{
    Action<T> addMethod = null;
    addMethod = v => Task.Create(delegate { body(v, addMethod); });
    Parallel.ForEach(initialValues, addMethod);
}

Not a lot of code, is it?  First, I create a delegate to be executed for every value.  This delegate creates a Task that executes the body action provided to the WhileNotEmpty method.  The code passes to the body the value to be processed, as well as an Action<T> delegate used to add more items for processing… what is that delegate?  It’s the same delegate to be executed for every value, e.g. it’s a recursive delegate call.  Then, using a Parallel.ForEach, it executes that same addMethod for every initialValue.  The crucial thing to realize here is that, under the covers, Parallel.ForEach uses Tasks.  It doesn’t necessarily use one Task per item (though in theory it could), but every item is processed within a Task.  The addMethod is creating more Tasks, and the parent tasks are implicitly waiting on their children (i.e. any tasks created from the parent).  Thus, the Parallel.ForEach won’t return until all of the values have been processed, including both the initialValues and any generated while processing.

One problem with this approach, however, is that any Task with children must be held alive until all of its children complete.  This means that for deep trees, tasks at the root of the tree could be kept alive well past when they’ve completed, leading to potential memory problems.  We could also quickly run into stack overflow issues.  We can address that with a few modifications to the initial implementation:

public static void ParallelWhileNotEmpty<T>(
    IEnumerable<T> initialValues, Action<T, Action<T>> body)
{
    TaskWaiter tasks = new TaskWaiter();
    Action<T> addMethod = null;
    addMethod = v =>
        tasks.Add(Task.Create(delegate { body(v, addMethod); },
            TaskCreationOptions.Detached));

    Parallel.ForEach(initialValues, addMethod);

    tasks.Wait();
}

Here I’m using the TaskWaiter implementation I wrote at Waiting for Tasks in order to track all of the tasks created by the addMethod delegate, and all of the Task instances are created as Detached, which means that no implicit waiting will be used from parent to child, and thus the children won’t keep the parents alive longer then they need to be (note that in the June 2008 CTP, this actually isn’t true, due to the Task.Creator property that holds a creator Task alive as long as the child Task is alive, but that’s something we’re addressing moving forward; if you run this with enough items in the June 2008 CTP, you may eventually run out of memory).  Then at the end of the of the ParallelWhileNotEmpty method, I simply wait on the TaskWaiter, which will only complete when all of the tasks created during processing have completed.

As mentioned earlier, there are plenty of implementations one could write to address this design.  Another would be based on alternating lists of Tasks.  The initial values to be processed are stored into the first list.  All those values are processed, and any new values they create are added to the second list.  Then the second list is processed, and any new values that are produced go back into the first list.  The the first list is processed, and… so on.  This continues until the next list to be processed has no values available:

public static void ParallelWhileNotEmpty<T>(
    IEnumerable<T> initialValues, Action<T, Action<T>> body)
{
    var lists = new [] {
        new ConcurrentStack<T>(initialValues), new ConcurrentStack<T>() };
    for(int i=0; ; i++)
    {
        int fromIndex = i % 2;
        var from = lists[fromIndex];
        var to = lists[fromIndex ^ 1];
        if (from.IsEmpty) break;

        Action<T> addMethod = v => to.Push(v);
        Parallel.ForEach(from.ToArray(), v => body(v, addMethod));
        from.Clear();
    }
}

Of course, there are also lots of variations that could be done on this implementation (the type of concurrent collection to use, whether to process the collection as an array or as an IEnumerable<T>, etc.), but you get the idea.

0 comments

Discussion is closed.

Feedback usabilla icon