I think almost every project in the real world uses some form of producer-consumer queue. The idea behind this problem is very simple. Application needs to decouple consumers of some data from the logic that processes it. Consider, for instance, the thread pool from the CLR: application can schedule some work using ThreadPool.QueueUserWorkItem and the thread pool will do its best to maximize application throughput by using optimal number of threads that will process the input data.
But using the thread pool is not always possible and/or appropriate. Even though you can control minimum and maximum number of threads in the thread pool, this configuration will affect the entire application but not some specific parts.
There are numerous possible solutions to producer-consumer problem. You can implement entirely custom solution with the application logic intermixed with low level threading aspects. It can be something semi-custom, like a list of tasks that deals with shared BlockingCollection. Or it could be a simple solution that is based on an existing component, like the ActionBlock<T> from TPL Dataflow.
Today we’re going to explore internals of the ActionBlock, discuss design decisions that were made by their authors and learn why you need to know them in order to avoid some weird behavior. Ready? Let’s go.
In my current project we have several cases where we need to solve producer-consumer problem. Here is one case: we have a parser and custom interpreter for a TypeScript-like language. Without digging too deep into the details, you can assume that we just need to parse a set of files and build what is called a ‘transitive closure’ of all the dependencies .
Roughly the logic is following:
- Parse the file
- Analyze the file to understand what the dependencies are.
- Resolve the dependencies (i.e. resolve which TypeScript files are required by this one by analysig ‘import * from’, ‘require’ etc)
- Schedule all the dependencies for parsing.
Pretty simple, right? Actually it is and here you can see how the logic could be implemented using TPL DataFlow and ActionBlock<T>:
private static Task<ParsedFile> ParseFileAsync(string path) { Console.WriteLine($"Parsing '{path}'. {{0}}", $"Thread Id - {Thread.CurrentThread.ManagedThreadId}"); Thread.Sleep(10); return Task.FromResult( new ParsedFile() { FileName = path, Dependencies = GetFileDependencies(path), }); } static void Main(string[] args) { long numberOfProcessedFiles = 0; ActionBlock<string> actionBlock = null; Func<string, Task> processFile = async path => { Interlocked.Increment(ref numberOfProcessedFiles); ParsedFile parsedFile = await ParseFileAsync(path); foreach (var dependency in parsedFile.Dependencies) { Console.WriteLine($"Sending '{dependency}' to the queue... {{0}}", $"Thread Id - {Thread.CurrentThread.ManagedThreadId}"); await actionBlock.SendAsync(dependency); } if (actionBlock.InputCount == 0) { // This is a marker that this is a last file and there // is nothing to process actionBlock.Complete(); } }; actionBlock = new ActionBlock<string>(processFile); actionBlock.SendAsync("FooBar.ts").GetAwaiter().GetResult(); Console.WriteLine("Waiting for an action block to finish..."); actionBlock.Completion.GetAwaiter().GetResult(); Console.WriteLine($"Done. Processed {numberOfProcessedFiles}"); Console.ReadLine();
Let’s discuss what is going on here. For the sake of simplicity all the logic resides in the Main method. numberOfProcessedFiles is used to check that the logic is correct and we’re not missing files because of race conditions. The main part of it resides in the processFile action that will be provided to an ActionBlock during construction. This delegate acts like ‘consumer’ and ‘producer’ at the same time: it gets a path from the queue, parses the file, schedules new items by calling actionBlock.SendAsync and then, if the input of the queue is empty it signals the queue that all the files were processed by calling actionBlock.Complete() (*). Then the Main method creates an ActionBlock, initiate the process by sending the first file and then waits for the completion.
ParseFileAsync fakes a parsing process and computes the dependencies by using following logic: file ‘foo.ts’ depends on ‘fo.ts’, that depends on ‘f.ts’ etc. Basically, each file depends on the same file with a shorter name. This is a very naïve logic but it helps to show the process.
ActionBlock provides an easy to use API and handles concurrency for you. The only caveat here is that the default ‘degree of parallelism’ is 1, and you should override the default by using ExecutionDataflowBlockOptions during construction of an ActionBlock. If the MaxDegreeOfParallelism property is greater than 1 then the ActionBlock will call a callback function from different threads (actually, from different Tasks) to process multiple incoming items at the same time.
Post vs. SendAsync or what to use when
Everyone who tried to solve producer-consumer problem faced a dilemma: what to do if an incoming flow exceeds an ability to process it? How to ‘throttle’ back? Just keep every possible object in memory and grow the queue indefinitely? Throw an exception? Return ‘false’? Block an ‘Enqueue’ method while the queue is full?
To solve this problem, ActionBlock authors decided to use the following well-known pattern:
- The client of an action block may provide a queue size (in the constructor).
- When a queue is full the Post method returns false and SendAsync method “blocks” until the queue will get a free spot (**).
In our first example, the code doesn’t specify any limits to the queue. This means that if the producer will push really hard the application can end up with OutOfMemoryException. Let’s change it. And for a sake of this example let’s set the queue size to some ridiculously low number, like … 1.
actionBlock = new ActionBlock<string>(processFile, new ExecutionDataflowBlockOptions() {BoundedCapacity = 1});
Now, if when we’ll run this code, we’ll get … a deadlock!
Deadlock
Let’s think about producer-consumer queue from the design point of view. You’re building your own custom implementation of the queue that takes a callback for processing elements. You need to decide should the queue be bounded or not. If the queue is bounded, you will end up with a similar set of methods that ActionBlock has: synchronous API for adding elements, that returns false when the queue is full and asynchronous version that returns a task. If the queue is full a client of the queue can decide what to do: to handle ‘overflow’ manually by using synchronous version or ‘await’ on the task with asynchronous version.
Then you need to decide when a given callback should be called. You may end up with a following logic: check the queue size, if the queue is not empty pick the element from it, call the callback, wait for it to finish and then remove the item from the queue. (Real implementation will be way more complicated because it should consider all possible race conditions that could happen here.) You may remove the item from the queue before calling the callback but as we’ll see in a moment it won’t change the possibility of getting deadlock.
This design is clean and simple but it can cause a problem very easily. Suppose the queue is full and the queue calling back a function to process the element. And instead of quickly processing the element, this callback tries to schedule another item by awaiting SendAsync:
The queue is full and the queue can’t accept new element because the callback is in progress. But the callback itself just got stuck on awaiting for SendAsync to finish. Classical deadlock!
Ok. We’re getting a deadlock because an ActionBlock removes elements only *after* the callback is called. Let’s consider an alternative: what if ActionBlock will remove an item *before* calling the callback?
Actually, deadlock would still be possible. Let’s consider ActionBlock with a bound size of 1 and degree of parallelism of 2.
- Thread T1 adds an element to the queue. ActionBlock removes an item and calls the callback.
- Thread T2 adds an element to the queue. ActionBlock removes an item and calls the callback.
- Thread T1 adds an element to the queue. ActionBlock can’t call the callback, because degree of parallelism is 2. The queue is full.
- The callback 1 tries to add an item to the queue using ‘await SendAsync’, but got stuck because the queue is full.
- The callback 2 tries to add an item to the queue using ‘await SendAsync’, but got stuck because the queue is full.
It means that removing elements before won’t help with the problem. In fact it will make it even worse because the probability of a deadlock would be lower (we need N callbacks that schedules additional work, where N is degree of parallelism). Another drawback of this solution is more subtle: actually, ActionBlock is not a generic purpose producer-consumer queue. This class implements ITargetSource and could be used in more complicated dataflows. For instance, you may use one BufferedBlock with more than one target action block to process items in parallel. With existing behavior when the action block is full it won’t accept more elements from the source and will leave an ability for other blocks in the chain to process the same item immediately instead.
If the item will be removed from the queue before calling the callback the actual ‘size’ of the queue would be ‘BoundedCapacity’ + ‘MaxDegreeOfParallelism’ which is way harder to reason about.
How to solve the problem?
UPDATE: in the original post, I’ve proposed the solution based on the call to SendAsync method in the Task.Run. But that solution just uses different TPL queue and if that queue would be full the same issue will occur.
I don’t think that there is a solution to this problem. If you need to bound queue capacity and the callback could schedule more work, then the ActionBlock is just not the right tool. In this case, you need to give up with a bounded capacity or implement producer-consumer pattern using different building blocks, for instance using BlockingCollection and manually control a set of workers.
Degree of Parallelism
Unlike the primitives from TPL all the blocks from TPL Dataflow are single threaded by default. It means that an ActionBlock will process items one-by-one with one thread, a TransformBlock will transform items one-by-one with one thread etc. The reason for this is simplicity: it is much easier to reason about dataflow graphs when there is no concurrency involved.
To enable parallelism you should provide an instance of ExecutionDataflowBlockOptions in the constructor with MaxDegreeOfParallelism property greater than 1. Btw, setting this property to -1 will enable ‘unbounded’ mode when ActionBlock will create as many tasks as possible and their number would only be limited by a given TaskScheduler that you may also provide at the construction time.
Conclusion
Designing an easy-to-use component is hard. Designing an easy to-use-component that deals with concurrency for you — even harder. The best way to use it correctly is to know how it is implemented, and what the restrictions the design team had in mind.
ActionBlock<T> is a great type that drastically simplifies most common producer-consumer scenarios. But even in this case, in order to use it correctly, you should know some key aspects of TPL Dataflow, like default degree of parallelism, behavior of bounded blocks and idea of work-items ownership.
—–
(*) This example is not 100% thread safe and full-blown implementation should not rely on actionBlock.InputCount. Could you spot the issue?
(**) The Post method returns false in one of two cases: the queue is full or the queue is completed. This could be confusing because one value indicates two different conditions. The SendAsync method on the other hand is different: a given Task<bool> would be blocked while the queue is full and the task.Result would be false if the queue can’t process elements anymore.
0 comments