(The full set of ParallelExtensionsExtras Tour posts is available here.)
The new .NET 4 System.Threading.ThreadLocal<T> is quite useful when you need per-thread, per-instance storage. This is in contrast to the fast ThreadStaticAttribute, which supports only per-thread storage (in .NET 4, ThreadLocal<T> actually layers on top of ThreadStaticAttribute to provide the additional per-instance behavior).
ThreadLocal<T> exposes through its Value property the thread-local value of the variable for the current thread. However, it doesn’t expose in any way a collection of all of the thread-local values that have been created on that ThreadLocal<T> instance. Having that functionality can be useful in a variety of circumstances. For example, if you use ThreadLocal<T> to store per-thread instances of some heavyweight and disposable object, you might want to dispose of all of the instances when you’re done using them. Or if you were implementing a reduction, doing a local reduction on each thread involved in the computation, you would need access to all of the thread-local values at the end in order to do the final cross-thread reduction.
This extra functionality is quite similar to the combinable type provided in Visual C++ 10. And although ThreadLocal<T> doesn’t provide this functionality out-of-the-box, we can easily layer it on top of it. This is precisely what the ReductionVariable<T> type in ParallelExtensionsExtras provides (see the ReductionVariable.cs file).
The ReductionVariable<T> type is very similar to ThreadLocal<T> in surface area, the key difference being that it adds a Values property (plural) that’s an IEnumerable<T> in addition to exposing the familiar Value property (singular) that returns a T:
public sealed class ReductionVariable<T>
{
public ReductionVariable(Func<T> seedFactory);
public T Value { get; }
public IEnumerable<T> Values { get; }
… // a few additional public helper methods
}
The ReductionVariable<T> instance maintains three private fields:
private readonly Func<T> _seedFactory;
private readonly ThreadLocal<StrongBox<T>> _threadLocal;
private readonly ConcurrentQueue<StrongBox<T>> _values;
The Func<> is the user-provided delegate used to initialize a thread’s local value when one hasn’t been initialized yet (if the seed factory is null, the default value of T will be used). The ThreadLocal<> is the thread-local storage, and the ConcurrentQueue<> is used to store all of the values from all of the threads.
Because we need to maintain a list of all of the thread-local values, and because the thread-local values might change over time, we need to wrap the values with an object we can always use to reference each of them. For that purpose, we utilize the System.Runtime.CompilerServices.StrongBox<T> type, which is simply an object that contains a field of type T. Then, instead of using a ThreadLocal<T>, we utilize a ThreadLocal<StrongBox<T>>, and instead of using a ConcurrentQueue<T>, we use a ConcurrentQueue<StrongBox<T>>. In this manner, the Value field of the StrongBox instances may change repeatedly, but the StrongBox<T> object reference will remain constant. From this, the rest of the implementation falls into place.
The constructor initializes the ThreadLocal<StrongBox<T>> to create a StrongBox<T> that wraps a newly created seed value and adds that new StrongBox<T> instance to the queue of values.
public ReductionVariable(Func<T> seedFactory)
{
_seedFactory = seedFactory;
_threadLocal = new ThreadLocal<StrongBox<T>>(CreateValue);
}
private StrongBox<T> CreateValue()
{
var s = new StrongBox<T>(
_seedFactory != null ? _seedFactory() : default(T));
_values.Enqueue(s);
return s;
}
The Value property then need only get and set the boxed value’s value:
public T Value
{
get { return _threadLocal.Value.Value; }
set { _threadLocal.Value.Value = value; }
}
And the Values property need only return the contents of the list. Of course, we don’t want to return the StrongBox<T> instances, but rather the values they contain, so we’ll use LINQ’s Select method to select just the values:
public IEnumerable<T> Values
{
get { return _values.Select(s => s.Value); }
}
With our ReductionVariable<T> in place, we can now utilize it to perform reductions, to track and clean up after thread-local state, and more. Here’s an example:
var edos = new ReductionVariable<ExpensiveDisposableObject>(
() => new ExpensiveDisposableObject());
var q = (from d in dataSource.AsParallel
select ProcessValueWithExpensiveObject(d, edos.Value)).ToArray();
foreach(var edo in edos) edo.Dispose();
0 comments