October 1st, 2018

The danger of TaskCompletionSource class

Sergey Tepliakov
Senior Software Engineer

… when used with async/await.

TaskCompletionSource<T> class is a very useful facility if you want to control the lifetime of a task manually. Here is a canonical example when TaskCompletionSourceis used for converting the event-based asynchronous code to the Task-based pattern:

public static Task PerformOperation(this PictureBox pictureBox)
{
    var tcs = new TaskCompletionSource<object>();
            
    // Naive version that does not unsubscribe from the event
    pictureBox.LoadCompleted += (s, ea) =>
    {
        if (ea.Cancelled) tcs.SetCanceled();
        else if (ea.Error != null) tcs.SetException(ea.Error);
        else tcs.SetResult(null);
    };
 
    pictureBox.LoadAsync();
 
    return tcs.Task;
}

Effectively, TaskCompletionSource<T> represents a future result and gives an ability to set the final state of the underlying task manually by calling SetCanceled, SetException or SetResult methods.

This class is very useful not only when you need to make an old code to look modern and fancy. TaskCompletionSource<T> is used in a variety of cases when an operation’s lifetime is controlled manually, for instance, in different communication protocols. So, let’s mimic one of them.

Let suppose we want to create a custom database adapter. The adapter will have a dedicated “worker” thread for processing requests and ExecuteAsync method that a client can use to schedule work for background processing. This is quite similar to what an actual Redis client does and some other database clients follow the same pattern, so this is not a far-fetched scenario.

public class DatabaseFacade : IDisposable
{
    private readonly BlockingCollection<(string item, TaskCompletionSource<string> result)> _queue =
        new BlockingCollection<(string item, TaskCompletionSource<string> result)>();
    private readonly Task _processItemsTask;
 
    public DatabaseFacade() => _processItemsTask = Task.Run(ProcessItems);
 
    public void Dispose() => _queue.CompleteAdding();
 
    public Task SaveAsync(string command)
    {
        var tcs = new TaskCompletionSource<string>();
        _queue.Add((item: command, result: tcs));
        return tcs.Task;
    }
 
    private async Task ProcessItems()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            Console.WriteLine($"DatabaseFacade: executing '{item.item}'...");
 
            // Waiting a bit to emulate some IO-bound operation
            await Task.Delay(100);
            item.result.SetResult("OK");
            Console.WriteLine("DatabaseFacade: done.");
        }
    }
}

The code is not quite production ready, but its a good example of a producer-consumer pattern based on BlockingCollection.

Suppose we have another component, let’s say a logger. A logger is usually implemented using the producer-consumer pattern as well. For performance reasons, we don’t want to flush the messages on each method call, and instead we can use a blocking collection and a dedicated thread for saving data to external sources. And one of the external sources could be a database.

public class Logger : IDisposable
{
    private readonly DatabaseFacade _facade;
    private readonly BlockingCollection<string> _queue =
        new BlockingCollection<string>();
 
    private readonly Task _saveMessageTask;
 
    public Logger(DatabaseFacade facade) =>
        (_facade, _saveMessageTask) = (facade, Task.Run(SaveMessage));
 
    public void Dispose() => _queue.CompleteAdding();
 
    public void WriteLine(string message) => _queue.Add(message);
 
    private async Task SaveMessage()
    {
        foreach (var message in _queue.GetConsumingEnumerable())
        {
            // "Saving" message to the file
            Console.WriteLine($"Logger: {message}");
 
            // And to our database through the facade
            await _facade.SaveAsync(message);
        }
    }
}

The logger’s implementation is extremely naive and by all means, you should not write yet another logger yourself. My goal here is to show how two producer-consumer queues may affect each other and the logger is quite a widely spread concept that is easy to understand.

The question is: can you see the issue here? Like a really serious issue!

Let’s try to run the following code:

using (var facade = new DatabaseFacade())
using (var logger = new Logger(facade))
{
    logger.WriteLine("My message");
    await Task.Delay(100);
 
    await facade.SaveAsync("Another string");
    Console.WriteLine("The string is saved");
}

The output is:

Logger: My message
DatabaseFacade: executing 'My message'...

We never save “Another string” into the database. Why? Because the database facade’s thread is blocked by the logger’s thread.

TaskCompletionSource type has a very peculiar behavior: by default, when SetResult method is called then all the task’s “async” continuations are invoked … synchronously. That’s what happening in our case (the same is true for SetCancelled, SetException, as well as their TrySetXXX counterparts):

The workflow (1)

 

It means that the two “queues” are implicitly linked together and the logger’s queue blocks the adapter’s queue.

Unfortunately, the situation like this is relatively common and I’ve faced it several times in my projects. The issue may occur when “a continuation” (*) of a task, backed by TaskCompletionSource<T>, blocks the thread in one way or another, thereby blocking a thread that calls SetResult.

(*) As we’ll see in a moment different types of continuations behave differently.

The main challenge with such issues that it’s very hard to understand the root cause. Once you have a dump of a process from a production machine you may see no obvious issues at all. You could have a bunch of threads waiting on kernel objects without any relevant user’s code in any of the stack traces.

Now let’s see why this is happening and how can we mitigate the issue.

Each task has a state field called m_stateFlags field that represents the current state of a task (like RanToCompletion, Cancelled, Failed etc). But this is not the only role of the field: it also contains a set of flags specified during task creation via TaskCreationOptions. These flags control different aspects, like whether to run the task in a dedicated thread ([TaskCreationOptions.LongRunning), to schedule work item into a global queue instead of a thread-local one (TaskCreationOptions.PreferFairness), or whether to force task continuations always run asynchronously (TaskCreationOptions.RunContinuationsAsynchronously).

Obviously, we’re interested in the latter aspect and we’ll see at the moment how you can specify this flag. But to fully understand the issue we need to look at the other aspect as well: we need to understand task continuations.

static async Task WithAsync()
{
    var task = Task.Run(() => { Sleep(100); });
    await task;
    Console.WriteLine("After task await");
}
 
static Task WithContinueWith()
{
    var task = Task.Run(() => { Sleep(100); });
    return task.ContinueWith(
        t => { Console.WriteLine("Inside ContinueWith"); },
        TaskContinuationOptions.OnlyOnRanToCompletion);
}

You may think that the two implementations are equivalent because the compiler just “moves” the block of code between await statements into a continuation, scheduled via ContinueWith. But this is only kind-of true and the actual logic is a bit more involved.

An actual transformation that the C# compiler does for async methods is described in more details in my other post “Dissecting async methods in C#” and here we’ll focus on one particular aspect: continuations scheduling.

When an awaited task is not finished, the generated state machine calls TaskAwaiter.UnsafeOnCompleted and passes a call back that is called when the awaited task is done to move the state machine forward. This method calls Task.SetContinuationForAwait to add a given action as a task’s continuation:

// Now register the continuation, and if we couldn't register it because the task is already completing,
// process the continuation directly (in which case make sure we schedule the continuation
// rather than inlining it, the latter of which could result in a rare but possible stack overflow).
if (tc != null)
{
    if (!AddTaskContinuation(tc, addBeforeOthers: false))
        tc.Run(this, bCanInlineContinuationTask: false);
}
else
{
    Contract.Assert(!flowExecutionContext, "We already determined we're not required to flow context.");
    if (!AddTaskContinuation(continuationAction, addBeforeOthers: false))
        AwaitTaskContinuation.UnsafeScheduleAction(continuationAction, this);
}

Local variable tc is not null if synchronization context is involved, otherwise, the elseblock is called. First, AddTaskContinuation method is called that returns true when a current task is not finished (to prevent stack overflow) and a given action is successfully added as a continuation for the current task. Otherwise UnsafeScheduleAction is called that creates AwaitTaskContinuation instance.

In a common case (more details later) a System.Action instance is added as a task continuation and the continuation is stored in Task.m_continuationObject.

Now, let’s see what is happening when a task is finished (code snippet from Task.FinishContinuations):

internal void FinishContinuations()
{
    // Atomically store the fact that this task is completing.  From this point on, the adding of continuations will
    // result in the continuations being run/launched directly rather than being added to the continuation list.
    object continuationObject = Interlocked.Exchange(ref m_continuationObject, s_taskCompletionSentinel);
 
    // If continuationObject == null, then we don't have any continuations to process
    if (continuationObject != null)
    {
        // Skip synchronous execution of continuations if this task's thread was aborted
        bool bCanInlineContinuations = !(((m_stateFlags & TASK_STATE_THREAD_WAS_ABORTED) != 0) ||
                                            (Thread.CurrentThread.ThreadState == ThreadState.AbortRequested) ||
                                            ((m_stateFlags & (int)TaskCreationOptions.RunContinuationsAsynchronously) != 0));
 
        // Handle the single-Action case
        Action singleAction = continuationObject as Action;
        if (singleAction != null)
        {
            AwaitTaskContinuation.RunOrScheduleAction(singleAction, bCanInlineContinuations, ref t_currentTask);
            return;
        }
        // The rest of the body
    }
}

FinishContinuations method checks the task creation flags and if RunContinuationsAsynchronously was not specified then it runs a single action continuation synchronously! The behavior is different for async/await and for task.ContinueWith cases. A continuation of the async method is invoked synchronously unless the task is finished (**), synchronization context or non-default task scheduler are used. It means that an “async” continuation runs synchronously almost all the time when the awaited task is not finished!

(**) This is kind-of a rare race condition. If an awaited task is finished at ‘await site’ (like at await finishedTask) then an async method continues it’s execution synchronously. This situation is only possible when a task is finished in the middle of a MoveNext call of a generated state machine.

But the logic for continuations scheduled by Task.ContinueWith is different: in this case, a StandardTaskContinuation instance is created and added as a task’s continuation. This continuation runs asynchronously unless TaskContinuationOptions.ExecuteSynchronously flag is specified regardless of the task creation options.

We can actually check that the issue we faced at the beginning has nothing to do with TaskCompletionSource per se and actually manifests itself with any tasks created without TaskCreationOptions.RunContinuationsAsynchronously:

static async Task WithAsync()
{
    Print("WithAsync");
    var task = Task.Run(
        () => { Sleep(100); Print("In Task.Run"); });
    await Task.Yield();
    await task;
    await Task.Yield();
    Print("After task await");
}
 
static Task WithContinueWith()
{
    Print("WithContinueWith");
    var task = Task.Run(
        () => { Sleep(100); Print("In Task.Run"); });
    var result = task.ContinueWith(
        t => { Print("Inside ContinueWith"); });
    return result;
}


    
await WithContinueWith();
await WithAsync();

The output:

WithAsync: 1
In Task.Run: 3
After task await: 3

WithContinueWith: 3
In Task.Run: 4
Inside ContinueWith: 5

As we can see, the block between await statement and the rest of the method runs synchronously in the same thread that runs Task.Run. But the continuation scheduled with task.ContinueWith runs asynchronously in a different thread. We can change the behavior by using Task.Factory.StartNew and providing TaskCreationOptions.RunContinuationsAsynchronously:

static async Task WithAsync()
{
    Print("WithAsync");
    var task = Task.Factory.StartNew(
        () => { Sleep(100); Print("In Task.Factory.StartNew"); },
        TaskCreationOptions.RunContinuationsAsynchronously);
    await task;
    Print("After task await");
}
WithAsync: 1
In Task.Factory.StartNew: 3
After task await: 4

How to solve the issue?

As we discussed already, there are two pieces involved here: 1) task creation options that control how to run continuation (synchronously, if possible, by default) and 2) a type of continuation.

There is nothing you can do to control the behavior of async/await. If you can’t control a task’s creation but want to run the continuations asynchronously you can explicitly call Task.Yield() right after await or switch to custom tasks altogether (which is hardly an option).

But if you can, you should provide task creation options every time you use TaskCompletionSource<T>:

static async Task WithAsync(TaskCreationOptions options)
{
    Print($"WithAsync. Options: {options}");
    var tcs = new TaskCompletionSource<object>(options);
 
    var setTask = Task.Run(
        () => {
            Sleep(100);
            Print("Setting task's result");
            tcs.SetResult(null);
            Print("Set task's result");
        });
                //await Task.Yield();
 
    await tcs.Task;

    Print("After task await");
    await setTask;
}
 
await WithAsync(TaskCreationOptions.None);
await WithAsync(TaskCreationOptions.RunContinuationsAsynchronously);

The output is:

WithAsync. Options: None: 1
Setting task's result: 3
After task await: 3
Set task's result: 3

WithAsync. Options: RunContinuationsAsynchronously: 3
Setting task's result: 4
Set task's result: 4
After task await: 3

Starting from .NET 4.6.1 TaskCompletionSource accepts TaskCreationFlags. If the flag TaskCreationOptions.RunContinuationsAsynchronously is specified, then all the continuations (including “async” continuations) are executed asynchronously. This will remove an implicit coupling that may occur when many async methods are chained together and one of the tasks in this chain is based on TaskCompletionSource.

Conclusion

  • TaskCompletionSource class was introduced in .NET 4.0 in a pre async-era for controlling a task’s lifetime manually.
  • By default all the task’s continuations are executed synchronously unless TaskCreationOptions.RunContinuationsAsynchronously option is specified.
  • All the “async” continuations (blocks between await statements) always run in a thread of an awaited task.
  • TaskCompletionSource instanced created with default constructor may cause deadlocks and other threading issues by running all “async” continuations in the thread that sets the result of a task.
  • If you use .NET 4.6.1+ you should always provide TaskCreationOptions.RunContinuationsAsynchronously when creating TaskCompletionSource instances.

Author

Sergey Tepliakov
Senior Software Engineer

Lead software developer and software architect. Specialties: C#, C++, OOD, Design Patterns, Unit Testing, Multitheading, TPL

2 comments

Discussion is closed. Login to edit/delete existing comments.