{"id":109725,"date":"2024-05-03T07:00:00","date_gmt":"2024-05-03T14:00:00","guid":{"rendered":"https:\/\/devblogs.microsoft.com\/oldnewthing\/?p=109725"},"modified":"2024-05-03T08:31:50","modified_gmt":"2024-05-03T15:31:50","slug":"20240503-00","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/oldnewthing\/20240503-00\/?p=109725","title":{"rendered":"Awaiting a set of handles with a timeout, part 4: Building our own awaiter"},"content":{"rendered":"<p>Last time, we figured out <a title=\"Awaiting a set of handles with a timeout, part 3: Going beyond two\" href=\"https:\/\/devblogs.microsoft.com\/oldnewthing\/20240502-00\/?p=109721\"> how to await an arbitrary number of handles with a common timeout<\/a>. But we found that there were two fundamental problems: The awaiter might not be movable, and we don&#8217;t want to throw an exception after some of the handles have been signaled (because that causes us to lose track of them).<\/p>\n<p>Since we don&#8217;t control the awaiter used by <code>resume_<wbr \/>on_<wbr \/>signal<\/code>, we&#8217;ll have to switch to something we do control.<\/p>\n<p>We&#8217;ll write our own awaiter.<\/p>\n<p>Fortunately, writing an awaiter is easier than writing a coroutine promise. We just need to implement the three awaiter methods: <code>await_ready<\/code>, <code>await_suspend<\/code>, and <code>await_resume<\/code>.<\/p>\n<p>In order to avoid the problem of throwing an exception partway through, we need to make sure we set up everything that could possibly throw an exception before we start waiting on any of the handles.<\/p>\n<p>Here&#8217;s our first attempt. Let&#8217;s start with the simple case that we are given a counted array of <code>HANDLE<\/code>s. Our function prototype will be this:<\/p>\n<pre>auto resume_on_all_signaled(HANDLE* handles, uint32_t size,\r\n    std::optional&lt;winrt::Windows::Foundation::TimeSpan&gt; timeout\r\n        = std::nullopt);\r\n<\/pre>\n<p>I changed the <code>timeout<\/code> parameter to an optional <code>TimeSpan<\/code>, where an empty value means that there is no timeout. This avoids problems in the original code where 0 meant &#8220;no timeout (wait indefinitely)&#8221;, but a value of zero, or even a negative value, could be generated by mistake, say because the deadline has been reached or has already been passed. Making it an explicitly optional parameter avoids this edge case where a computed timeout happens to match the sentinel value. It also means that you will be able to pass a timeout of zero to probe the handles without waiting.<\/p>\n<p>We start with this guy:<\/p>\n<pre>struct resume_all_state\r\n{\r\n    struct resume_all_awaiter* m_parent;\r\n    HANDLE m_handle;\r\n    bool* m_result;\r\n    wil::unique_threadpool_wait_nowait m_wait;\r\n    };\r\n<\/pre>\n<p>The <code>resume_<wbr \/>all_<wbr \/>state<\/code> holds the information we need about each handle. It holds a pointer to the awaiter (to be defined below), the handle we are waiting for, where we should record the result of the handle wait, and the threadpool wait that will notify us when the handle is signaled (or the timeout elapses).<\/p>\n<pre>struct resume_all_awaiter\r\n{\r\n<\/pre>\n<p>To save ourselves some typing, we&#8217;ll create a type alias.<\/p>\n<pre>    using TimeSpan = winrt::Windows::Foundation::TimeSpan;\r\n<\/pre>\n<p>And then we can declare our member variables.<\/p>\n<pre>    std::atomic&lt;uint32_t&gt; m_remaining;\r\n    std::vector&lt;resume_all_state&gt; m_states;\r\n    winrt::com_array&lt;bool&gt; m_results;\r\n    std::coroutine_handle&lt;&gt; m_resume;\r\n    std::optional&lt;TimeSpan&gt; m_timeout;\r\n<\/pre>\n<p>The awaiter keeps track of a few things.<\/p>\n<ul>\n<li><code>m_remaining<\/code>: The number of handles for which we are still waiting for a result. This decreases each time a handle becomes signaled or times out, and when it reaches zero, we resume the coroutine.<\/li>\n<li><code>m_states<\/code>: A vector of <code>state<\/code>s, one for each handle.<\/li>\n<li><code>m_results<\/code>: The <code>com_array<\/code> which holds the results that we return as the result of the <code>co_await<\/code>.<\/li>\n<li><code>m_resume<\/code>: The coroutine to resume once we get all the results.<\/li>\n<li><code>m_timeout<\/code>: The timeout after which we give up waiting for the handles.<\/li>\n<\/ul>\n<p>Okay, let&#8217;s write the constructor.<\/p>\n<pre>    resume_all_awaiter(HANDLE* handles, uint32_t size,\r\n        std::optional&lt;TimeSpan&gt; timeout) :\r\n        m_remaining(size),\r\n        m_states(size),\r\n        m_results(size),\r\n        m_timeout(timeout)\r\n    {\r\n        for (auto index = 0U; index &lt; size; ++index) {\r\n            auto&amp; s = m_states[index];\r\n            s.m_parent = this;\r\n            s.m_handle = handles[index];\r\n            s.m_result = &amp;m_results[index];\r\n            s.m_wait.reset(winrt::check_pointer(\r\n                CreateThreadpoolWait(callback, &amp;s, nullptr)));\r\n        }\r\n    }\r\n<\/pre>\n<p>We use the <code>size<\/code> to establish the number of <code>resume_<wbr \/>all_<wbr \/>state<\/code>s we need, the number of handles we are still waiting for (namely, all of them), and the number of <code>bool<\/code>s we need to return. We also save the timeout for later.<\/p>\n<p>Inside the constructor body, we initialize the states with a pointer back to the <code>awaiter<\/code>, the handle to (eventually) wait for, a pointer to where we want to record the wait result, and a threadpool wait that uses the corresponding <code>resume_<wbr \/>all_<wbr \/>state<\/code> object as the callback data.<\/p>\n<p>It is important that the vector not be reallocated once we pass a pointer to the <code>resume_<wbr \/>all_<wbr \/>state<\/code> to <code>Create\u00adThreadpool\u00adWait<\/code>, because reallocation will move the <code>resume_<wbr \/>all_<wbr \/>state<\/code> objects, leaving the pointer dangling and producing a use-after-free bug.<\/p>\n<p>Note that we copy the handles into our <code>resume_<wbr \/>all_<wbr \/>state<\/code> objects rather than just saving the original pointer and size. That&#8217;s because the caller might not <code>co_await<\/code> the awaiter immediately, and the pointer we received might have been a temporary.<\/p>\n<pre>auto awaiter = resume_on_all_signaled(std::array{ h1, h2 }.data(), 2);\r\nco_await awaiter;\r\n<\/pre>\n<p>Yes, this is a weird-sounding corner case, but it&#8217;ll be important later.<\/p>\n<p>The most important thing right now is that we do all the things that could potentially fail right up front in the constructor. That way, if the <code>co_await<\/code> throws an exception, the caller knows that no handles have been waited on, and the states of the objects in question have not been modified.<\/p>\n<pre>    bool await_ready() noexcept { return false; }\r\n<\/pre>\n<p>The <code>await_ready<\/code> is easy: We are never ready. We always ask for the coroutine to be suspended. Which is what comes next:<\/p>\n<pre>    void await_suspend(std::coroutine_handle&lt;&gt; resume) noexcept\r\n    {\r\n        m_resume = resume;\r\n\r\n        FILETIME ft;\r\n        FILETIME* timeout = nullptr;\r\n        if (m_timeout) {\r\n            auto count = (std::max)(m_timeout-&gt;count(), TimeSpan::rep(0));\r\n            ft = wil::filetime::from_int64(-count);\r\n            timeout = &amp;ft;\r\n        }\r\n\r\n        for (auto&amp;&amp; s : m_states) {\r\n            SetThreadpoolWait(s.m_wait.get(), s.m_handle, timeout);\r\n        }\r\n    }\r\n<\/pre>\n<p>We start by saving the coroutine to be resumed when all the handles have either waited successfully or timed out.<\/p>\n<p>Next, we convert the <code>m_timeout<\/code> to a format that <code>Set\u00adThreadpool\u00adWait<\/code> expects. There are three cases.<\/p>\n<ul>\n<li>If the <code>m_timeout<\/code> is empty, then we are waiting with no timeout, and the way to specify that to <code>Set\u00adThreadpool\u00adWait<\/code> is to pass <code>nullptr<\/code>.<\/li>\n<li>If the <code>m_timeout<\/code> is negative, then we clamp it to zero. This accommodates edge cases where the code tries to wait for handles just after the deadline has passed.<\/li>\n<li>We then pass that timeout (in the form of a <code>FILETIME<\/code>) to <code>Set\u00adThreadpool\u00adWait<\/code> as a negative value, since that&#8217;s the way that <code>Set\u00adThreadpool\u00adWait<\/code> represents elapsed time. (Positive values represent absolute time.)<\/li>\n<\/ul>\n<p>I parenthesized <code>std::max<\/code> to avoid <code>max<\/code> being recognized as a function-like macro. For historical reasons, <code>windows.h<\/code> defines <code>min<\/code> and <code>max<\/code> macros, and we don&#8217;t want those. You can suppress those macro definitions by saying <code>NOMINMAX<\/code> before including <code>windows.h<\/code>, but it&#8217;s common in library code to parenthesize <code>std::min<\/code> and <code>std::max<\/code> to avoid the problem entirely.<\/p>\n<pre>    static void CALLBACK callback(PTP_CALLBACK_INSTANCE,\r\n        void* context, PTP_WAIT, TP_WAIT_RESULT result)\r\n    {\r\n        auto&amp; s = *reinterpret_cast&lt;resume_all_state*&gt;(context);\r\n        *s.m_result = (result == WAIT_OBJECT_0);\r\n        if (s.m_parent-&gt;m_remaining.fetch_sub(1,\r\n                 std::memory_order_release) == 1) {\r\n            s.m_parent-&gt;m_resume();\r\n        }\r\n    }\r\n<\/pre>\n<p>As each handle wait completes, we recover the <code>resume_<wbr \/>all_<wbr \/>state<\/code> object for that handle and use it to record whether the wait succeeded. We then atomically decrement the number of remaining handles, and if it reaches zero, we resume the coroutine. Since we used a <code>unique_<wbr \/>threadpool_<wbr \/>wait_<wbr \/>nowait<\/code>, the destructor of the threadpool wait won&#8217;t wait for callbacks to complete, which in our case is a good thing, because waiting for the callback to complete would lead to a deadlock: The destructor of the awaiter would wait for the callback to complete, but the destructor is running as part of the coroutine resumption, which is happening <i>in the callback<\/i>.\u00b9<\/p>\n<p>The <code>--<\/code> operator on a <code>std::atomic<\/code> uses sequential consistency semantics, but we need only release semantics (we are publishing a value, namely the wait result), so we use <code>fetch_sub<\/code>, which allows us to specify a memory order. The <code>fetch_sub<\/code> method returns the <i>previous<\/i> value, so we detect that we decremented to zero by seeing if the previous value was 1.<\/p>\n<p>The last thing the awaiter needs to do is return the results when the coroutine resumes.<\/p>\n<pre>    auto await_resume() noexcept\r\n    {\r\n        return std::move(m_results);\r\n    }\r\n};\r\n<\/pre>\n<p>The <code>resume_<wbr \/>on_<wbr \/>all_<wbr \/>signaled<\/code> function now just needs to return a properly-constructed awaiter.<\/p>\n<pre>auto resume_on_all_signaled(HANDLE* handles, uint32_t size,\r\n    std::optional&lt;winrt::Windows::Foundation::TimeSpan&gt; timeout\r\n        = std::nullopt)\r\n{\r\n    return resume_all_awaiter(handles, size, timeout);\r\n}\r\n<\/pre>\n<p>Okay, now that we have a basic version, we can start fine-tuning it. Next time.<\/p>\n<p><b>Bonus chatter<\/b>: When this problem was first presented to me, I said, &#8220;Just create an awaiter that creates one threadpool wait for each handle, and which resumes when all the waits complete or time out.&#8221; This is just the realization of that basic idea.<\/p>\n<p>\u00b9 This trick of using a <code>_nowait<\/code> threadpool wait handle works only because we never resume the coroutine until after all the waits have completed. If there were cases where the coroutine resumes before all the waits have completed, we would need to use a waiting version of the threadpool wait handle to ensure that the callback doesn&#8217;t access memory after it has been freed. <a title=\"Avoiding deadlocks when cancelling a thread pool callback, part 1: External callback data\" href=\"https:\/\/devblogs.microsoft.com\/oldnewthing\/20180503-00\/?p=98665\"> We could use <code>Dissociate\u00adCurrent\u00adThread\u00adFrom\u00adCallback<\/code><\/a> just before resuming the coroutine to exempt the current callback from the wait.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>To stop relying on unspecified behavior.<\/p>\n","protected":false},"author":1069,"featured_media":111744,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[1],"tags":[25],"class_list":["post-109725","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-oldnewthing","tag-code"],"acf":[],"blog_post_summary":"<p>To stop relying on unspecified behavior.<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/posts\/109725","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/users\/1069"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/comments?post=109725"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/posts\/109725\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/media\/111744"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/media?parent=109725"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/categories?post=109725"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/tags?post=109725"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}