… when used with async/await.
TaskCompletionSource<T>
class is a very useful facility if you want to control the lifetime of a task manually. Here is a canonical example when TaskCompletionSource
is used for converting the event-based asynchronous code to the Task-based pattern:
public static Task PerformOperation(this PictureBox pictureBox) {    var tcs = new TaskCompletionSource<object>();               // Naive version that does not unsubscribe from the event    pictureBox.LoadCompleted += (s, ea) =>    {        if (ea.Cancelled) tcs.SetCanceled();        else if (ea.Error != null) tcs.SetException(ea.Error);        else tcs.SetResult(null);    };    pictureBox.LoadAsync();    return tcs.Task; }
Effectively, TaskCompletionSource<T>
represents a future result and gives an ability to set the final state of the underlying task manually by calling SetCanceled
, SetException
or SetResult
methods.
This class is very useful not only when you need to make an old code to look modern and fancy. TaskCompletionSource<T>
is used in a variety of cases when an operation’s lifetime is controlled manually, for instance, in different communication protocols. So, let’s mimic one of them.
Let suppose we want to create a custom database adapter. The adapter will have a dedicated “worker” thread for processing requests and ExecuteAsync
method that a client can use to schedule work for background processing. This is quite similar to what an actual Redis client does and some other database clients follow the same pattern, so this is not a far-fetched scenario.
public class DatabaseFacade : IDisposable { private readonly BlockingCollection<(string item, TaskCompletionSource<string> result)> _queue = new BlockingCollection<(string item, TaskCompletionSource<string> result)>(); private readonly Task _processItemsTask; public DatabaseFacade() => _processItemsTask = Task.Run(ProcessItems); public void Dispose() => _queue.CompleteAdding(); public Task SaveAsync(string command) { var tcs = new TaskCompletionSource<string>(); _queue.Add((item: command, result: tcs)); return tcs.Task; } private async Task ProcessItems() { foreach (var item in _queue.GetConsumingEnumerable()) { Console.WriteLine($"DatabaseFacade: executing '{item.item}'..."); // Waiting a bit to emulate some IO-bound operation await Task.Delay(100); item.result.SetResult("OK"); Console.WriteLine("DatabaseFacade: done."); } } }
The code is not quite production ready, but its a good example of a producer-consumer pattern based on BlockingCollection
.
Suppose we have another component, let’s say a logger. A logger is usually implemented using the producer-consumer pattern as well. For performance reasons, we don’t want to flush the messages on each method call, and instead we can use a blocking collection and a dedicated thread for saving data to external sources. And one of the external sources could be a database.
public class Logger : IDisposable { private readonly DatabaseFacade _facade; private readonly BlockingCollection<string> _queue = new BlockingCollection<string>(); private readonly Task _saveMessageTask; public Logger(DatabaseFacade facade) => (_facade, _saveMessageTask) = (facade, Task.Run(SaveMessage)); public void Dispose() => _queue.CompleteAdding(); public void WriteLine(string message) => _queue.Add(message); private async Task SaveMessage() { foreach (var message in _queue.GetConsumingEnumerable()) { // "Saving" message to the file Console.WriteLine($"Logger: {message}"); // And to our database through the facade await _facade.SaveAsync(message); } } }
The logger’s implementation is extremely naive and by all means, you should not write yet another logger yourself. My goal here is to show how two producer-consumer queues may affect each other and the logger is quite a widely spread concept that is easy to understand.
The question is: can you see the issue here? Like a really serious issue!
Let’s try to run the following code:
using (var facade = new DatabaseFacade()) using (var logger = new Logger(facade)) { logger.WriteLine("My message"); await Task.Delay(100); await facade.SaveAsync("Another string"); Console.WriteLine("The string is saved"); }
The output is:
Logger: My message DatabaseFacade: executing 'My message'...
We never save “Another string” into the database. Why? Because the database facade’s thread is blocked by the logger’s thread.
TaskCompletionSource
type has a very peculiar behavior: by default, when SetResult
method is called then all the task’s “async” continuations are invoked … synchronously. That’s what happening in our case (the same is true for SetCancelled
, SetException
, as well as their TrySetXXX
counterparts):
It means that the two “queues” are implicitly linked together and the logger’s queue blocks the adapter’s queue.
Unfortunately, the situation like this is relatively common and I’ve faced it several times in my projects. The issue may occur when “a continuation” (*) of a task, backed by TaskCompletionSource<T>
, blocks the thread in one way or another, thereby blocking a thread that calls SetResult
.
(*) As we’ll see in a moment different types of continuations behave differently.
The main challenge with such issues that it’s very hard to understand the root cause. Once you have a dump of a process from a production machine you may see no obvious issues at all. You could have a bunch of threads waiting on kernel objects without any relevant user’s code in any of the stack traces.
Now let’s see why this is happening and how can we mitigate the issue.
Each task has a state field called m_stateFlags
field that represents the current state of a task (like RanToCompletion
, Cancelled
, Failed
etc). But this is not the only role of the field: it also contains a set of flags specified during task creation via TaskCreationOptions
. These flags control different aspects, like whether to run the task in a dedicated thread ([TaskCreationOptions.LongRunning
), to schedule work item into a global queue instead of a thread-local one (TaskCreationOptions.PreferFairness
), or whether to force task continuations always run asynchronously (TaskCreationOptions.RunContinuationsAsynchronously).
Obviously, we’re interested in the latter aspect and we’ll see at the moment how you can specify this flag. But to fully understand the issue we need to look at the other aspect as well: we need to understand task continuations.
static async Task WithAsync() { var task = Task.Run(() => { Sleep(100); }); await task; Console.WriteLine("After task await"); } static Task WithContinueWith() { var task = Task.Run(() => { Sleep(100); }); return task.ContinueWith( t => { Console.WriteLine("Inside ContinueWith"); }, TaskContinuationOptions.OnlyOnRanToCompletion); }
You may think that the two implementations are equivalent because the compiler just “moves” the block of code between await
statements into a continuation, scheduled via ContinueWith
. But this is only kind-of true and the actual logic is a bit more involved.
An actual transformation that the C# compiler does for async methods is described in more details in my other post “Dissecting async methods in C#” and here we’ll focus on one particular aspect: continuations scheduling.
When an awaited task is not finished, the generated state machine calls TaskAwaiter.UnsafeOnCompleted
and passes a call back that is called when the awaited task is done to move the state machine forward. This method calls Task.SetContinuationForAwait
to add a given action as a task’s continuation:
// Now register the continuation, and if we couldn't register it because the task is already completing, // process the continuation directly (in which case make sure we schedule the continuation // rather than inlining it, the latter of which could result in a rare but possible stack overflow). if (tc != null) { if (!AddTaskContinuation(tc, addBeforeOthers: false)) tc.Run(this, bCanInlineContinuationTask: false); } else { Contract.Assert(!flowExecutionContext, "We already determined we're not required to flow context."); if (!AddTaskContinuation(continuationAction, addBeforeOthers: false)) AwaitTaskContinuation.UnsafeScheduleAction(continuationAction, this); }
Local variable tc
is not null if synchronization context is involved, otherwise, the else
block is called. First, AddTaskContinuation
method is called that returns true
when a current task is not finished (to prevent stack overflow) and a given action is successfully added as a continuation for the current task. Otherwise UnsafeScheduleAction
is called that creates AwaitTaskContinuation
instance.
In a common case (more details later) a System.Action
instance is added as a task continuation and the continuation is stored in Task.m_continuationObject
.
Now, let’s see what is happening when a task is finished (code snippet from Task.FinishContinuations
):
internal void FinishContinuations() { // Atomically store the fact that this task is completing. From this point on, the adding of continuations will // result in the continuations being run/launched directly rather than being added to the continuation list. object continuationObject = Interlocked.Exchange(ref m_continuationObject, s_taskCompletionSentinel); // If continuationObject == null, then we don't have any continuations to process if (continuationObject != null) { // Skip synchronous execution of continuations if this task's thread was aborted bool bCanInlineContinuations = !(((m_stateFlags & TASK_STATE_THREAD_WAS_ABORTED) != 0) || (Thread.CurrentThread.ThreadState == ThreadState.AbortRequested) || ((m_stateFlags & (int)TaskCreationOptions.RunContinuationsAsynchronously) != 0)); // Handle the single-Action case Action singleAction = continuationObject as Action; if (singleAction != null) { AwaitTaskContinuation.RunOrScheduleAction(singleAction, bCanInlineContinuations, ref t_currentTask); return; } // The rest of the body } }
FinishContinuations
method checks the task creation flags and if RunContinuationsAsynchronously
was not specified then it runs a single action continuation synchronously! The behavior is different for async
/await
and for task.ContinueWith
cases. A continuation of the async method is invoked synchronously unless the task is finished (**), synchronization context or non-default task scheduler are used. It means that an “async” continuation runs synchronously almost all the time when the awaited task is not finished!
(**) This is kind-of a rare race condition. If an awaited task is finished at ‘await site’ (like at await finishedTask
) then an async method continues it’s execution synchronously. This situation is only possible when a task is finished in the middle of a MoveNext
call of a generated state machine.
But the logic for continuations scheduled by Task.ContinueWith
is different: in this case, a StandardTaskContinuation
instance is created and added as a task’s continuation. This continuation runs asynchronously unless TaskContinuationOptions.ExecuteSynchronously
flag is specified regardless of the task creation options.
We can actually check that the issue we faced at the beginning has nothing to do with TaskCompletionSource
per se and actually manifests itself with any tasks created without TaskCreationOptions.RunContinuationsAsynchronously
:
static async Task WithAsync() { Print("WithAsync"); var task = Task.Run( () => { Sleep(100); Print("In Task.Run"); }); await Task.Yield(); await task; await Task.Yield(); Print("After task await"); } static Task WithContinueWith() { Print("WithContinueWith"); var task = Task.Run( () => { Sleep(100); Print("In Task.Run"); }); var result = task.ContinueWith( t => { Print("Inside ContinueWith"); }); return result; } await WithContinueWith(); await WithAsync();
The output:
WithAsync: 1 In Task.Run: 3 After task await: 3 WithContinueWith: 3 In Task.Run: 4 Inside ContinueWith: 5
As we can see, the block between await
statement and the rest of the method runs synchronously in the same thread that runs Task.Run
. But the continuation scheduled with task.ContinueWith
runs asynchronously in a different thread. We can change the behavior by using Task.Factory.StartNew
and providing TaskCreationOptions.RunContinuationsAsynchronously
:
static async Task WithAsync() { Print("WithAsync"); var task = Task.Factory.StartNew( () => { Sleep(100); Print("In Task.Factory.StartNew"); }, TaskCreationOptions.RunContinuationsAsynchronously); await task; Print("After task await"); }
WithAsync: 1 In Task.Factory.StartNew: 3 After task await: 4
How to solve the issue?
As we discussed already, there are two pieces involved here: 1) task creation options that control how to run continuation (synchronously, if possible, by default) and 2) a type of continuation.
There is nothing you can do to control the behavior of async
/await
. If you can’t control a task’s creation but want to run the continuations asynchronously you can explicitly call Task.Yield()
right after await
or switch to custom tasks altogether (which is hardly an option).
But if you can, you should provide task creation options every time you use TaskCompletionSource<T>
:
static async Task WithAsync(TaskCreationOptions options) { Print($"WithAsync. Options: {options}"); var tcs = new TaskCompletionSource<object>(options); var setTask = Task.Run( () => { Sleep(100); Print("Setting task's result"); tcs.SetResult(null); Print("Set task's result"); }); //await Task.Yield(); await tcs.Task; Print("After task await"); await setTask; } await WithAsync(TaskCreationOptions.None); await WithAsync(TaskCreationOptions.RunContinuationsAsynchronously);
The output is:
WithAsync. Options: None: 1 Setting task's result: 3 After task await: 3 Set task's result: 3 WithAsync. Options: RunContinuationsAsynchronously: 3 Setting task's result: 4 Set task's result: 4 After task await: 3
Starting from .NET 4.6.1 TaskCompletionSource
accepts TaskCreationFlags
. If the flag TaskCreationOptions.RunContinuationsAsynchronously
is specified, then all the continuations (including “async” continuations) are executed asynchronously. This will remove an implicit coupling that may occur when many async methods are chained together and one of the tasks in this chain is based on TaskCompletionSource
.
Conclusion
TaskCompletionSource
class was introduced in .NET 4.0 in a preasync
-era for controlling a task’s lifetime manually.- By default all the task’s continuations are executed synchronously unless
TaskCreationOptions.RunContinuationsAsynchronously
option is specified. - All the “async” continuations (blocks between
await
statements) always run in a thread of an awaited task. TaskCompletionSource
instanced created with default constructor may cause deadlocks and other threading issues by running all “async” continuations in the thread that sets the result of a task.- If you use .NET 4.6.1+ you should always provide
TaskCreationOptions.RunContinuationsAsynchronously
when creatingTaskCompletionSource
instances.
If you made it through all those words you should take the time to vote so you don’t have to read so many words around this problem space to understand and utilize it:
https://developercommunity.visualstudio.com/idea/583945/improve-asynchronous-programming-model.html
👆 Now sitting as the #4 requested feature in .NET developer community. 👆
This issue also arises when using Lazy<Task<T>> objects for caching, and then have multiple threads awaiting the same task to complete. The continuations will all run synchronously in the same thread! Here are two related StackOverflow questions:
Multiple await on same task may cause blocking
Async threadsafe Get from MemoryCache