{"id":104949,"date":"2021-03-11T07:00:00","date_gmt":"2021-03-11T15:00:00","guid":{"rendered":"https:\/\/devblogs.microsoft.com\/oldnewthing\/?p=104949"},"modified":"2021-03-12T07:21:30","modified_gmt":"2021-03-12T15:21:30","slug":"20210311-00","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/oldnewthing\/20210311-00\/?p=104949","title":{"rendered":"Creating other types of synchronization objects that can be used with co_await, part 3: Parallel resumption"},"content":{"rendered":"<p>Last time, we developed <a title=\"Creating other types of synchronization objects that can be used with co_await, part 2: The basic library\" href=\"https:\/\/devblogs.microsoft.com\/oldnewthing\/20210310-00\/?p=104945\"> a library for building awaitable synchronization objects<\/a>. I noted that when the coroutines are released, they are resumed in sequence, which means that one coroutine can prevent others from progressing. Let&#8217;s resume them in parallel.<\/p>\n<p>One option is to use <code>Try\u00adSubmit\u00adThreadpool\u00adCallback<\/code> to put the resumption on the thread pool. In the <code>awaitable_<wbr \/>state<\/code> class, replace the <code>resume_<wbr \/>node<\/code> method with this version:<\/p>\n<pre>        static void CALLBACK resume_node_callback(\r\n            PTP_CALLBACK_INSTANCE, void* context) noexcept\r\n        {\r\n            std::experimental::coroutine_handle&lt;&gt;::\r\n                from_address(context).resume();\r\n        }\r\n\r\n        void resume_node(impl::node_base* node) noexecpt\r\n        {\r\n            if (!TrySubmitThreadpoolCallback(\r\n                resume_node_callback,\r\n                extra_node(*node).handle.address(),\r\n                nullptr))\r\n            {\r\n                std::terminate(); \/\/ fatal\r\n            }\r\n        }\r\n<\/pre>\n<p>Instead of resuming the handle immediately and synchronously, we submit a callback to the thread pool, and have the callback resume the coroutine.<\/p>\n<p>This works, but there is a problem if <code>Try\u00adSubmit\u00adThreadpool\u00adCallback<\/code> fails, since we have no way to report an error to the caller. All we can do is terminate the process.<\/p>\n<p>An alternative is to use the <code>Create\u00adThreadpool\u00adWork<\/code> \/ <code>Submit\u00adThreadpool\u00adWork<\/code> pattern which has the advantage of front-loading all of the error conditions. That way, we can throw a low memory exception at the point of the <code>await<\/code> rather than finding ourselves stuck when it comes time to resume.<\/p>\n<p>Our <code>node_<wbr \/>handle<\/code> now babysits a threadpool work item:<\/p>\n<pre>    struct node_handle : node_base\r\n    {\r\n        PTP_WORK work{};\r\n    };\r\n<\/pre>\n<p>This member records the work item that we will use to resume the coroutine. It is non-null if the coroutine is on the synchronization object&#8217;s wait list. We set this up as part of the suspension:<\/p>\n<pre>        bool await_suspend(\r\n            std::experimental::coroutine_handle&lt;&gt; handle,\r\n            impl::node&lt;extra_await_data&gt;&amp; node)\r\n        {\r\n            auto guard = std::lock_guard(mutex);\r\n            if (parent().claim(node.extra)) return false;\r\n            <span style=\"color: blue;\">node.work = check_pointer(\r\n                CreateThreadpoolWork(resume_node_callback,\r\n                    handle.address(), nullptr));<\/span>\r\n            sentinel.append_node(node);\r\n            return true;\r\n        }\r\n<\/pre>\n<p>When we realize that we need to suspend, we create a work item that will perform the resumption. We can raise a low memory exception at this point, and it will be captured into the caller.<\/p>\n<p>Resuming the coroutine node consists of submitting the work:<\/p>\n<pre>        void resume_node(impl::node_base* node) noexcept\r\n        {\r\n            <span style=\"color: blue;\">SubmitThreadpoolWork(extra_node(*node).work);<\/span>\r\n        }\r\n<\/pre>\n<p>And we move the work item cleanup into the callback function:<\/p>\n<pre>        static void CALLBACK resume_node_callback(\r\n            PTP_CALLBACK_INSTANCE, void* context<span style=\"color: blue;\">, PTP_WORK work<\/span>)\r\n            noexcept\r\n        {\r\n            <span style=\"color: blue;\">CloseThreadpoolWork(work);<\/span>\r\n            std::experimental::coroutine_handle&lt;&gt;::\r\n                from_address(context).resume();\r\n        }\r\n\r\n<\/pre>\n<p>The work can be closed at any time after it is submitted: Closing a submitted work item does not cancel the outstanding work. We don&#8217;t want to slow down the <code>resume_<wbr \/>list<\/code> method, so we make the work item responsible for its own bookkeeping: That way, the cost is paid by the resuming coroutine rather than the signaling one.<\/p>\n<p>The other bit of bookkeeping is nulling out the <code>work<\/code> now that it&#8217;s been closed.<\/p>\n<pre>        void await_resume(\r\n            impl::node&lt;extra_await_data&gt;&amp; node) noexcept\r\n        {\r\n            <span style=\"color: blue;\">node.work = nullptr;<\/span>\r\n        }\r\n<\/pre>\n<p>And finally, we tweak our abandonment detection:<\/p>\n<pre>        void unlink_node(impl::node_base&amp; node) noexcept\r\n        {\r\n            <span style=\"color: blue;\">auto work = extra_node(*node).work;\r\n            if (node.work) {\r\n                CloseThreadpoolWork(work);<\/span>\r\n                auto guard = std::lock_guard(mutex);\r\n                node.next = node.prev-&gt;next;\r\n                node.prev = node.next-&gt;prev;\r\n            }\r\n        }\r\n<\/pre>\n<p>There is an additional optimization decision to be made here, which is finding the best place to close the work item. Here&#8217;s the diagram again:<\/p>\n<div id=\"p20210311_head\" style=\"display: none;\">\u00a0<\/div>\n<table class=\"cp3\" style=\"border-collapse: collapse; text-align: center;\" border=\"0\" cellspacing=\"0\" cellpadding=\"3\">\n<tbody>\n<tr>\n<td style=\"border: solid 1px black;\" colspan=\"5\">Awaiter constructed<br \/>\n<code>work = nullptr;<\/code><\/td>\n<\/tr>\n<tr>\n<td>\u2193<\/td>\n<td>&nbsp;<\/td>\n<td>\u2193<\/td>\n<td>&nbsp;<\/td>\n<td>&nbsp;<\/td>\n<\/tr>\n<tr>\n<td style=\"border: solid 1px black;\" rowspan=\"7\">No suspension<\/td>\n<td>&nbsp;<\/td>\n<td style=\"border: solid 1px black;\">Suspended<br \/>\n<code>await_suspend<\/code><br \/>\n<code>work = work item;<\/code><\/td>\n<td>&nbsp;<\/td>\n<td>&nbsp;<\/td>\n<\/tr>\n<tr>\n<td>&nbsp;<\/td>\n<td>\u2193<\/td>\n<td>\u2198\ufe0e<\/td>\n<td>&nbsp;<\/td>\n<\/tr>\n<tr>\n<td>&nbsp;<\/td>\n<td style=\"border: solid 1px black;\">Resume<br \/>\n<code>resume_node<\/code><\/td>\n<td>&nbsp;<\/td>\n<td style=\"border: solid 1px black;\" rowspan=\"5\">Abandoned<\/td>\n<\/tr>\n<tr>\n<td>&nbsp;<\/td>\n<td>\u2193<\/td>\n<td>&nbsp;<\/td>\n<\/tr>\n<tr>\n<td>&nbsp;<\/td>\n<td style=\"border: solid 1px black;\">Resuming<br \/>\n<code>resume_node_callback<\/code><br \/>\nwork closed<\/td>\n<td>&nbsp;<\/td>\n<\/tr>\n<tr>\n<td>&nbsp;<\/td>\n<td>\u2193<\/td>\n<td>&nbsp;<\/td>\n<\/tr>\n<tr>\n<td>&nbsp;<\/td>\n<td style=\"border: solid 1px black;\">Resumed<br \/>\n<code>await_ready<\/code><br \/>\n<code>work = nullptr;<\/code><\/td>\n<td>&nbsp;<\/td>\n<\/tr>\n<tr>\n<td>\u2193<\/td>\n<td>&nbsp;<\/td>\n<td>\u2193<\/td>\n<td>&nbsp;<\/td>\n<td>\u2193<\/td>\n<\/tr>\n<tr>\n<td style=\"border: solid 1px black;\" colspan=\"5\">Awaiter destructed<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<p>The analyses for the no-suspend path and the abandonment path are the same as last time. The extra decision in the center path is deciding when to close the work item. I decided to do it in <code>resume_<wbr \/>node_<wbr \/>callback<\/code>: I definitely want the work item to be responsible for closing its own work. That avoids adding extra responsibilities to the signaling coroutine, which is only fair because you don&#8217;t want to bog down the signaling code with work that wasn&#8217;t even its idea! And to reduce code size, I want closing the work item to be done in shared code, which in this case is the thread pool work item callback itself. That same callback is going to be used for all resumptions of all nodes used by any client of this library. If closing the work item had been moved to <code>await_<wbr \/>resume<\/code>, then that would get inlined into every coroutine&#8217;s resumption code.<\/p>\n<p>Okay, that was perhaps a deeper dive than you wanted into the subject of creating an awaitable synchronization object. But now that I have this whole thing created, I want to drive it around a bit. We&#8217;ll start that <a title=\"Creating other types of synchronization objects that can be used with co_await, part 4: The manual-reset event\" href=\"https:\/\/devblogs.microsoft.com\/oldnewthing\/20210312-00\/?p=104955\"> next time<\/a>.<\/p>\n<p>\n<script>\nwindow.addEventListener(\"load\", function() {\n  var fullFF = getComputedStyle(document.querySelector(\"body\")).fontFamily;\n  var simpleFF = fullFF.replace(\/ Emoji\/g, \"\");\n  \/\/ break up \"style\" to prevent wordpress from injecting random junk\n  document.getElementById(\"p20210311_head\").innerHTML =\n`<s` + `tyle>\nbody { font-family: ${simpleFF}; }\n.emoji { font-family: ${fullFF}; }\n<\/s` + `tyle>`;\n});\n<\/script><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Resuming waiting coroutines in parallel on the thread pool.<\/p>\n","protected":false},"author":1069,"featured_media":111744,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[1],"tags":[25],"class_list":["post-104949","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-oldnewthing","tag-code"],"acf":[],"blog_post_summary":"<p>Resuming waiting coroutines in parallel on the thread pool.<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/posts\/104949","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/users\/1069"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/comments?post=104949"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/posts\/104949\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/media\/111744"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/media?parent=104949"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/categories?post=104949"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/oldnewthing\/wp-json\/wp\/v2\/tags?post=104949"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}