In my last past, we looked at building an AsyncLock in terms of an AsyncSemaphore. In this post, we’ll build a more advanced construct, an asynchronous reader/writer lock.
An asynchronous reader/writer lock is more complicated than any of the previous coordination primitives we’ve created. It also involves more policy, meaning there are more decisions to be made about how exactly the type should behave. For the purposes of this example, I’ve made a few decisions. First, writers take precedence over readers. This means that regardless of the order in which read or write requests arrive, if a writer is waiting, it will also get priority over any number of waiting readers, even if it arrived later than those readers. Second, I’ve decided not to throttle readers, meaning that all waiting readers will be released as soon as there are no writers outstanding or waiting. Both of those points could be debated based on the intended usage of the type, so you might choose to modify the implementation based on your needs.
Here’s the shape of the type we’ll build:
public class AsyncReaderWriterLock
{
public AsyncReaderWriterLock();public Task<Releaser> ReaderLockAsync();
public Task<Releaser> WriterLockAsync();public struct Releaser : IDisposable
{
public void Dispose();
}
}
As with the AsyncLock, we’ll utilize a disposable Releaser to make it easy to use this type in a scoped manner, e.g.
private readonly AsyncReaderWriterLock m_lock = new AsyncReaderWriterLock();
…
using(var releaser = await m_lock.ReaderLockAsync())
{
… // protected code here
}
This Releaser is almost identical to that used in AsyncLock, except that we’re using the same type to represent both readers and writers, and since we need to behave differently based on which kind of lock is being released, I’ve parameterized the Releaser accordingly:
public struct Releaser : IDisposable
{
private readonly AsyncReaderWriterLock m_toRelease;
private readonly bool m_writer;internal Releaser(AsyncReaderWriterLock toRelease, bool writer)
{
m_toRelease = toRelease;
m_writer = writer;
}public void Dispose()
{
if (m_toRelease != null)
{
if (m_writer) m_toRelease.WriterRelease();
else m_toRelease.ReaderRelease();
}
}
}
In terms of members variables, I need several more for this type than I’ve needed for the other data structures previously discussed. First, we will have fast paths in this type, so I want to cache a Task<Releaser> for reader waits that complete immediately, and one for writer waits that complete immediately.
private readonly Task<Releaser> m_readerReleaser;
private readonly Task<Releaser> m_writerReleaser;
These members will be initialized in the constructor:
public AsyncReaderWriterLock()
{
m_readerReleaser = Task.FromResult(new Releaser(this, false));
m_writerReleaser = Task.FromResult(new Releaser(this, true));
}
Next, I need to maintain a queue of writer waiters, one TaskCompletionSource<Releaser> for each, since I need to be able to wake them individually. I also need a TaskCompletionSource<Releaser> for my readers; however, for the readers, per our previously discussed design, when it’s time to allow a reader to run, I can allow them all to run, and therefore I just need a single TaskCompletionSource<Releaser> that all of the readers in a given group will wait on. However, since I’m maintaining a single TaskCompletionSource<Releaser> for all readers, I also need to maintain a count of how many readers are waiting, so that when I eventually wake them all, I can keep track of all of their releases and know when there are no more outstanding readers.
private readonly Queue<TaskCompletionSource<Releaser>> m_waitingWriters =
new Queue<TaskCompletionSource<Releaser>>();
private TaskCompletionSource<Releaser> m_waitingReader =
new TaskCompletionSource<Releaser>();
private int m_readersWaiting;
Finally, I need a variable to maintain the current status of the lock. This will be an integer, where the value of 0 means that no one has acquired the lock, a value of –1 means that a writer has acquired the lock, and a positive value means that one or more readers have acquired the lock, where the positive value indicates how many.
private int m_status;
We now have four methods to implement: ReaderLockAsync, ReaderRelease, WriterLockAsync, and WriterRelease.
ReaderLockAsync is used when a new reader wants in. After acquiring the lock on m_waitingWriters (which we’ll use across all four of these methods to ensure data consistency), we need to determine whether the reader should be allowed in immediately or should be forced to wait. Based on the policy described earlier, if there are currently no writers active or waiting, then this reader can be allowed in immediately; in that case, we increment the status (which would have either been 0, meaning no activity on the lock, or positive, meaning there are currently readers) and we return the cached reader releaser. If, however, there was an active or waiting writer, then we need to force the reader to wait, which we do by incrementing the count of the number of readers waiting, and return the m_waitingReader task (or, rather, a continuation off of the reader task, ensuring that all awaiters will be able to run concurrently rather than getting serialized).
public Task<Releaser> ReaderLockAsync()
{
lock (m_waitingWriters)
{
if (m_status >= 0 && m_waitingWriters.Count == 0)
{
++m_status;
return m_readerReleaser;
}
else
{
++m_readersWaiting;
return m_waitingReader.Task.ContinueWith(t => t.Result);
}
}
}
WriterLockAsync is used when a new writer wants in. As with ReaderLockAsync, there are two cases to deal with: when the writer can be allowed in immediately, and when the writer must be forced to wait. The only time a writer can be allowed in immediately is when the lock is currently not being used at all; since a writer must be exclusive, it can’t run when there are an active readers or active writers. So, if m_status is 0, we change the status to indicate that there’s now an active writer, and we return the cached writer releaser. Otherwise, we create a new TaskCompletionSource<Releaser> for this writer, queue it, and return its Task.
public Task<Releaser> WriterLockAsync()
{
lock (m_waitingWriters)
{
if (m_status == 0)
{
m_status = -1;
return m_writerReleaser;
}
else
{
var waiter = new TaskCompletionSource<Releaser>();
m_waitingWriters.Enqueue(waiter);
return waiter.Task;
}
}
}
Now we need to write the release functions, which are called when an active reader or writer completes its work and wants to release its hold on the lock. ReaderRelease needs to decrement the count of active readers, and then check the current state of the lock. If it was the last active reader and there are now writers waiting, then it needs to wake one of those writers and mark that the lock now has an active writer. We don’t need to check for any pending readers; if there are any writers, then they’d take priority anyway, and if there aren’t any pending writers, than any readers that had arrived would have been allowed in immediately.
private void ReaderRelease()
{
TaskCompletionSource<Releaser> toWake = null;lock (m_waitingWriters)
{
–m_status;
if (m_status == 0 && m_waitingWriters.Count > 0)
{
m_status = -1;
toWake = m_waitingWriters.Dequeue();
}
}if (toWake != null)
toWake.SetResult(new Releaser(this, true));
}
Finally, we need our WriterRelease method. When a writer completes, if there are any pending writers waiting to get in, we simply dequeue and complete one of their tasks (we don’t need to update the lock’s status, since there will still be a single active writer, with one having completed and a new one having taken its place). If there aren’t any writers, but there are readers waiting, then we can complete the single task on which all of those readers are waiting; in that case, we also need to create a new single task for all subsequent readers to wait on, and we need to update our status accordingly to now indicate how many active readers there are. If there weren’t any writers or readers waiting, then we can simply reset the lock’s status.
private void WriterRelease()
{
TaskCompletionSource<Releaser> toWake = null;
bool toWakeIsWriter = false;lock (m_waitingWriters)
{
if (m_waitingWriters.Count > 0)
{
toWake = m_waitingWriters.Dequeue();
toWakeIsWriter = true;
}
else if (m_readersWaiting > 0)
{
toWake = m_waitingReader;
m_status = m_readersWaiting;
m_readersWaiting = 0;
m_waitingReader = new TaskCompletionSource<Releaser>();
}
else m_status = 0;
}if (toWake != null)
toWake.SetResult(new Releaser(this, toWakeIsWriter));
}
That’s it. Now, a production implementation of such a lock would likely want to be better instrumented, throw exceptions for erroneous usage (e.g. releasing when there wasn’t anything to be released), and so forth, but this should give you a basic sense of how such an asynchronous reader/writer lock could be implemented.
Before I conclude, it’s worth highlighting that .NET 4.5 includes a related type: ConcurrentExclusiveSchedulerPair. I briefly discussed this type when describing what was new for parallelism in .NET 4.5 Developer Preview, but in short, it provides reader/writer-like scheduling for tasks (and it’s robust and has been well-tested, unlike the code in this post). Hanging off of an instance of ConcurrentExclusiveSchedulerPair are two TaskScheduler instances: ConcurrentScheduler and ExclusiveScheduler. These two schedulers collude to ensure that an “exclusive” (or writer) task may only run when no other task associated with the schedulers is running, and that one or more “concurrent” (or reader) tasks may run concurrently as long as there are no exclusive tasks. The type includes more advanced capabilities than what I’ve implemented in this post, for example being able to throttle readers (whereas in my AsyncReaderWriterLock in this post, all readers are allowed in as long as there are no writers).
You can use ConcurrentExclusiveSchedulerPair to build similar solutions as what you might use AsyncReaderWriterLock in. For example, instead of wrapping the protected code with a block that will on entrance access and await AsyncReaderWriterLock.WriterLockAsync and then on exit call the returned Releaser’s Dispose, you could instead await a task queued to the ConcurrentExclusiveSchedulerPair’s ExclusiveScheduler. ConcurrentExclusiveSchedulerPair also works well with systems layered on top of TPL and in terms of TaskScheduler, like TPL Dataflow. You can, for example, create multiple ActionBlocks that all target the same ConcurrentScheduler instance configured with a maximum concurrency level, and then all of those blocks will collude to ensure they don’t go above that maximum.
However, there is a key behavior aspect of ConcurrentExclusiveSchedulerPair to keep in mind: it works at the level of a Task’s execution, and this may or may not be what you want. If you write code like the following:
Task.Factory.StartNew(async delegate =>
{
… // code #1
await SomethingAsync();
… // code #2
}, CancellationToken.None, TaskCreationOptions.None, myExclusiveScheduler);
that could either end up resulting in one Task queued to the scheduler (if the Task returned from SomethingAsync was complete by the time we awaited it), or it could result in two Tasks queued to the scheduler (if the Task returned from SomethingAsync wasn’t yet complete by the time we awaited it). If it results in two tasks, then the atomicity provided by the exclusive scheduler applies to each task individually, not across them… in effect the exclusive lock can be released while awaiting. For certain scenarios, this is exactly what you want, and in particular for cases where you’re using the atomicity to provide consistency while accessing in-memory data structures and where you ensure that await calls do not come in the middle of such modifications. To achieve the same behavior with AsyncReaderWriterLock, you’d need to do something like the following:
Task t = null;
using(var releaser = await m_lock.WriterLockAsync())
{
… // code #1
t = SomethingAsync();
}
await t;
using (var releaser = await m_lock.WriterLockAsync())
{
… // code #2
}
That concludes my short series on building async coordination primitives. I hope you enjoyed it.
0 comments