Exception Handling in TPL Dataflow Networks

Cristina Manu

A “dataflow block” is represented by a class implementing the IDataflowBlock interface. The state of a dataflow block is represented by the state of its IDataflowBlock.Completion Task, which itself has a Status property. When a dataflow block is in active state, meaning that it is currently doing processing or may do more processing in the future, its Completion task will be in a non-final state, such as TaskStatus.WaitingForActivation. Once the dataflow block has completed all of its processing, either successfully or due to failure or cancellation, the Completion task’s Status will return one of the three values below:

  • RanToCompletion
  • Canceled
  • Faulted

This blog post will focus on the third state of a completed dataflow block: Faulted.

Current users of the Task Parallel Library (TPL) will notice that the exception model employed by TPL Dataflow follows the same model as TPL. In TPL, if an exception goes unhandled in a Task’s delegate, the Task ends in the Faulted state. Following the same model, in TPL Dataflow if an exception goes unhandled during the processing of a message, the exception will fault the block’s Completion task. The Exception will be available from the block’s Completion Task’s Exception property, and it can be handled as in TPL model. Note, as well, that execution blocks (e.g. ActionBlock) may support parallel processing, such that the block can be processing multiple messages concurrently; this can be configured via the MaxDegreeOfParallelism property on the DataflowBlockOptions used to configure the block when it is constructed. As with parallel loops and PLINQ queries, exceptions in such parallel processing do not forcibly interrupt other concurrent processing, nor is the construct’s processing considered completed the moment the exception occurs; if an exception does occur, the dataflow block will complete only once all processing has quiesced.

Transition to “Faulted” state

If one of the below conditions is met the block will move to Faulted state.

1. Explicit Fault: The invocation of IDataflowBlock.Fault(Exception) will Fault the block. In this case, the exception provided as an argument to the Fault method will be used as the block’s Completion Task’s exception.

BufferBlock<int> block = new BufferBlock<int>(); 
((IDataflowBlock)block).Fault(new Exception("Test"));

2. Unhandled Delegate Exception: For blocks that run user delegates, e.g. ActionBlock, the user-provided delegate could throw exceptions. Any unhandled exception from these delegates will fault the block.

ActionBlock<int> action = new ActionBlock<int>((x) => 
{ 
    throw new Exception("Test"); 
});

action.Post(1);

try
{
    action.Completion.Wait();
}
catch(AggregateException ex)
{
    Console.WriteLine(ex.InnerException.Message);
}

3. Fault Propagation: The Fault state of a source block can be automatically propagated to linked target blocks if the link between the blocks was configured with a DataflowLinkOptions that had its PropagateCompletion set to true.

TransformBlock<int, int> transformBlock = 
    new TransformBlock<int, int>((x) => 
{ 
    if (x == 1) { throw new Exception("Test"); } else { return x; } 
});

BufferBlock<int> bufferBlock = new BufferBlock<int>();

transformBlock.LinkTo(bufferBlock, 
    new DataflowLinkOptions() { PropagateCompletion= true});

transformBlock.Post(1);
try
{
    bufferBlock.Completion.Wait();
}
catch(AggregateException ex)
{
    Console.WriteLine(ex.InnerException.InnerException.Message);
}

4. Incorrect Interface Implementations: If a block is incorrectly implemented such that one of its interface methods used for inter-block messaging throws an exception, that exception may fault the block with which it’s communicating. For example, a source block might be incorrectly implemented such that its ConsumeMessage method throws. Any linked block that will try to consume the message will become faulted even if the incorrectly implemented block is not faulted. Similar behavior can be observed if an incorrectly implemented target throws from its OfferMessage method.

BufferBlock<int> bufferBlock = new BufferBlock<int>(
    new DataflowBlockOptions() { BoundedCapacity = 1 });

InvalidSource<int> source = new InvalidSource<int>(2);

bufferBlock.Post(1);
//buffer is full so the message will be postponed

source.LinkTo(bufferBlock);

//make room for the new message to be consumed
int firstElement = bufferBlock.Receive();

try
{
    bufferBlock.Completion.Wait();
}
catch(AggregateException ex)
{
    Console.WriteLine(ex.InnerException.Message);
}

public class InvalidSource<T> : ISourceBlock<T>
{
    T message;

    public FaultedSource(T message)
    {
        this.message= message;
    }

    T ISourceBlock<T>.ConsumeMessage(
        DataflowMessageHeader messageHeader, ITargetBlock<T> target, 
        out bool messageConsumed)
    {
        throw new Exception("Test");
    }

    public IDisposable LinkTo(ITargetBlock<T> target, 
        DataflowLinkOptions linkOptions)
    {
        target.OfferMessage(new DataflowMessageHeader(1), 
            this.message, this, true);
        return null;
    }

    void ISourceBlock<T>.ReleaseReservation(
        DataflowMessageHeader messageHeader, ITargetBlock<T> target)
    {
        throw new NotImplementedException();
    }

    bool ISourceBlock<T>.ReserveMessage(
        DataflowMessageHeader messageHeader, ITargetBlock<T> target)
    {
        throw new NotImplementedException();
    }

    public void Complete()
    {
        throw new NotImplementedException();
    }

    public Task Completion
    {
        get { throw new NotImplementedException(); }
    }

    void IDataflowBlock.Fault(Exception exception)
    {
        throw new NotImplementedException();
    }
}

Behaviors of a “Faulted” Block

After a block becomes faulted:

  • It should stop processing additional messages.
  • It should clear its message queues, both input and output.
  • It should fault its Completion task.
  • If it is a target:
    • It should decline any further incoming messages.
    • It should release any reserved messages.
    • Optionally, it may reserve and then release any postponed messages to notify the sources they will never be needed.
  • If it is a source:
    • It should stop offering messages.
    • It should transfer the exception to any targets currently linked where those links were created with DataflowLinkOptions.PropagateCompletion == true.
    • It should unlink from currently linked targets

 

Behaviors of a network with “Faulted” blocks

1. Reserved Messages

In order to avoid message corruption, a faulted block should clear its message queues and move into a Faulted state as soon as possible. There is a single scenario that does not obey to this rule: a source block holding a message reserved by a target. If a block that encounters an internal exception has a message that was reserved by a target, the reserved message must not be dropped, and the block should not be moved into the Faulted state until the message is released or consumed. Here is an example.

BufferBlock<int> bufferBlock = new BufferBlock<int>();
ReservingTarget<int> target = new ReservingTarget<int>();

bufferBlock.LinkTo(target);
bufferBlock.Post(1);
bufferBlock.Post(2);
bufferBlock.Post(3);

//give time to have the message reserved
Thread.Sleep(1000);

((IDataflowBlock)bufferBlock).Fault(new Exception("Test"));
bool completionBlock = bufferBlock.Completion.Wait(2000);
Console.WriteLine("Block is completed {0}. ", completionBlock);

//consume the message and block will be completed
target.ConsumeMessage();

try
{
    bufferBlock.Completion.Wait();
}
catch (AggregateException ex)
{
    Console.WriteLine(ex.InnerException.Message);
}

class ReservingTarget<T> : ITargetBlock<T>
{
    DataflowMessageHeader messageHeader;
    ISourceBlock<T> source;

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue, 
        ISourceBlock<T> source, bool consumeToAccept)
    {
        this.messageHeader = messageHeader;
        this.source = source;

        //reserve
        Task.Factory.StartNew(() => 
            { source.ReserveMessage(messageHeader, this); });

        return DataflowMessageStatus.Postponed;
    }

    public void ConsumeMessage()
    {
        bool msgConsumeStatus = false;
        this.source.ConsumeMessage(this.messageHeader, 
            this, out msgConsumeStatus);
    }

    public void Complete()
    {
        throw new NotImplementedException();
    }

    public Task Completion
    {
        get { throw new NotImplementedException(); }
    }

    void IDataflowBlock.Fault(Exception exception)
    {
        throw new NotImplementedException();
    }
}

2. Hanging Networks

Dataflow blocks may be written to expect a specific number of input messages offered to it. Especially in the case where an earlier block in the network might fault, it’s possible such a block may not receive the expected number of messages; in the faulted case, blocks will explicitly drop messages they’ve already processed but haven’t yet forwarded and they’ll decline further messages offered to them. There are two ways to observe that a block in a network faults:

  • Keep a reference to all the blocks in the network and use Task.WaitAll or Task.WhenAll to wait for them (synchronously or asynchronously). If a block faults, its Completion task will complete in the Faulted state.
  • Use DataflowLinkOptions with PropagateCompletion == true when building a linear network. That will propagate block completion from source to target. In this case it is enough to wait on the network leaf block.

 

Behavior of Extensions Methods on Faulted blocks

  • Choose: If all provided blocks have faulted, the returned task will be completed in the Canceled state.
  • LinkTo: If the source block is completed (faulted or otherwise), no link should be created. The IDisposable returned from the call will have a Dispose method that’s effectively a nop.
  • OutputAvailableAsync: If the source block is faulted, the returned Task<Boolean> will be completed in the RanToCompletion state and with its Result equal to false.
  • Post: If the target block is declining further messages (e.g. because it’s completed, because it experienced an unhandled exception and is shutting down, etc.), the result of the Post method will be false.
  • Receive: If the source block is faulted, this will throw an InvalidOperationException.
  • ReceiveAsync: If the source block is faulted, the returned Task will complete in the Faulted state with the InvalidOperationException representing no data available (just as in the Receive case).
  • SendAsync: As with Post, if the target block is declining further messages (e.g. because it’s completed, because it experienced an unhandeled exception and is shutting down, etc.), the returned task will complete in the RanToCompletion state with its Result equal to false.
  • TryReceive: Trying to receive from a faulted block will return false.

 

Exception Handling Design Guidelines for new Dataflow Blocks

It is recommended that new Dataflow blocks are implemented following the below guidelines:

  1. Avoid throwing exceptions in the implementation of the block’s interface methods.
  2. The block should propagate the completion state to a linked target if, and only if, the blocks were linked using DataflowLinkOptions that has its PropagateCompletion property set to true.
  3. Consider faulting the block when an exception is raised during message processing.
  4. Consider cleaning block’s input / output queues when the block moves into the Complete state.
  5. Consider releasing the postponed messages when the block moves into the Complete state.

0 comments

Discussion is closed.

Feedback usabilla icon