Partitioning in PLINQ
Every PLINQ query that can be parallelized starts with the same step: partitioning. Some queries may even need to repartition in the middle. Partitioning is a fairly simple concept at the high level: PLINQ takes a lock on the input data source, breaks it into multiple pieces, and then distributes these to the available processing cores on the machine. Each of the cores will then process the data as appropriate and either merge, aggregate, or execute a function over the results from the partitions.
Example query: (from x in D.AsParallel() where p(x) select x*x*x).Sum();
Here’s a simple way to look at it. On a 4-core machine, take 4 million elements, divide this into 4 partitions of 1 million elements each, and give each of the 4 cores a million elements of data to process. Assuming that the data and the processing of the data is uniform, that all of the cores operate with the same amount of effectiveness, that nothing else is using the cores, and that we can access all of the elements directly rather than only being able to access element N after accessing N-1 (i.e. indexing rather than iterating) this is an efficient algorithm for some straight-forward types of queries. If you just counted those assumptions, there are a lot of factors that PLINQ takes into account when processing a query. It’s not as simple as it appears to be. There are many factors that come into account, some of which did not even factor as assumptions in that query. How is the data being ordered? Is there contention within the query itself? Do we know how much data is in the query? On and on, the considerations continue. Meanwhile, we’re trying to design a multi-purpose system that can handle any of an infinite number of query shapes that you can throw at it. (You might wonder how we test all of that. Ask us about SLUG someday.)
Needless to say, we need to take a look over the query and the data source to make these decisions. And we want to make these decisions fast, because the more time we spend making the decisions, the more performance we have wasted that could be spent processing the data. There are a lot of things to balance here as we try to decide what best to do.
Based on many factors, we have 4 primary algorithms that we use for partitioning alone. They’re worth getting to know, because we’ll talk more about them and tweaks that we make to them in future technology discussions.
1. Range Partitioning – This is a pretty common partitioning scheme, similar to the one that I described in the example above. This is amenable to many query shapes, though it only works with indexible data sources such as lists and arrays (i.e. IList<T> and T). If you give PLINQ something typed as IEnumerable or IEnumerable<T>, PLINQ will query for the ILIst<T> interface, and if it’s found, will use that interface implementation with range partitioning. The benefits of these data sources is that we know the exact length and can access any of the elements within the arrays directly. For the majority of cases, there are large performance benefits to using this type of partitioning.
2. Chunk Partitioning – This is a general purpose partitioning scheme that works for any data source, and is the main partitioning type for non-indexible data sources. In this scheme, worker threads request data, and it is served up to the thread in chunks. IEnumerables and IEnumerable<T>s do not have fixed Count properties (there is a LINQ extension method for this, but that is not the same), so there’s no way to know when or if the data source will enumerate completely. It could be 3 elements, it could be 3 million elements, it could be infinite. A single system needs to take all of these possibilities into account and factor in different delegate sizes, uneven delegate sizes, selectivity etc. The chunk partitioning algorithm is quite general and PLINQ’s algorithm had to be tuned for good performance on a wide range of queries. We’ve experimented with many different growth patterns and currently use a plan that doubles after a certain number of requests. This is subject to change as we tune for performance, so don’t depend on this. Another important optimization is that chunk partitioning balances the load among cores, as the tasks per core dynamically request more work as needed. This ensures that all cores are utilized throughout the query and can all cross the finish line at the same time vs. a ragged, sequential entry to the end.
3. Striped Partitioning – This scheme is used for SkipWhile and TakeWhile and is optimized for processing items at the head of a data source (which obviously suits the needs of SkipWhile and TakeWhile). In striped partitioning, each of the n worker threads is allocated a small number of items (sometimes 1) from each block of n items. The set of items belonging to a single thread is called a ‘stripe’, hence the name. A useful feature of this scheme is that there is no inter-thread synchronization required as each worker thread can determine its data via simple arithmetic. This is really a special case of range partitioning and only works on arrays and types that implement IList<T>.
4. Hash Partitioning – Hash partitioning is a special type of partitioning that is used by the query operators that must compare data elements (these operators are: Join, GroupJoin, GroupBy, Distinct, Except, Union, Intersect). When hash-partitioning occurs (which is just prior to any of the operators mentioned), all of the data is processed and channeled to threads such that items with identical hash-codes will be handled by the same thread. This hash-partitioning work is costly, but it means that all the comparison work can then be performed without further synchronization. Hash partitioning assigns every element to an output partition based on a hash computed from each element’s key. This can be an efficient way of building a hash-table on the fly concurrently, and can be used to accomplish partitioning and hashing for the hash join algorithm. The benefit is that PLINQ can now use the same hash partitioning scheme for the data source used for probing; this way all possible matches end up in the same partition, meaning less shared data and smaller hash table sizes (each partition has its own hash table). There’s a lot going on with hash-partitioning, so it’s not as speedy as the other types, especially when ordering is involved in the query. As a result the query operators that rely upon it have additional overheads compared to simpler operators.
Hopefully that gives you a little better understanding of what goes on under the covers. As we write more about the system, including performance tips and tricks, here’s a basis for understanding how things work. If you can structure your applications to use specific types of partitioning, you may be able to receive greater performance gains by leveraging the more efficient partitioning algorithms.