{"id":55932,"date":"2010-04-04T14:06:00","date_gmt":"2010-04-04T14:06:00","guid":{"rendered":"https:\/\/blogs.msdn.microsoft.com\/pfxteam\/2010\/04\/04\/parallelextensionsextras-tour-2-tasktresult-toobservable\/"},"modified":"2010-04-04T14:06:00","modified_gmt":"2010-04-04T14:06:00","slug":"parallelextensionsextras-tour-2-tasktresult-toobservable","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/dotnet\/parallelextensionsextras-tour-2-tasktresult-toobservable\/","title":{"rendered":"ParallelExtensionsExtras Tour &#8211; #2 &#8211; Task.ToObservable"},"content":{"rendered":"<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\"><em>(The full set of ParallelExtensionsExtras Tour posts is available&nbsp;<\/em><a href=\"https:\/\/blogs.msdn.com\/pfxteam\/archive\/2010\/04\/04\/9990342.aspx\"><font color=\"#dd4a21\"><em>here<\/em><\/font><\/a><em>.)<\/em>&nbsp;&nbsp;<\/font><\/font><\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">In our <a href=\"https:\/\/blogs.msdn.com\/pfxteam\/archive\/2010\/04\/04\/9990343.aspx\">previous ParallelExtensionsExtras tour post<\/a>, we discussed a custom implementation of the LINQ operators, in particular for working with Task&lt;TResult&gt; instances in an asynchronous manner. <\/font><\/font><font size=\"3\"><font face=\"Calibri\">There is already an impressive implementation of the LINQ operators and more for working with asynchronous operations: <a href=\"https:\/\/msdn.microsoft.com\/en-us\/devlabs\/ee794896.aspx\">Reactive Extensions<\/a> (Rx).<span>&nbsp; <\/span>Rx provides a full LINQ implementation based on the IObservable&lt;T&gt; interface that was added to the .NET Framework 4. <\/font><\/font><font size=\"3\"><font face=\"Calibri\">As it turns out, while Task&lt;TResult&gt; does not currently implement IObservable&lt;T&gt;, it&rsquo;s actually quite a good fit to do so.<span>&nbsp; <\/span>An observable represents any number of values terminated by either an end of the stream or by an exception.<span>&nbsp; <\/span>A Task&lt;TResult&gt; fits this description: it completes with either a single value or with an exception.<span>&nbsp; <\/span>While it&rsquo;s possible that a future version of the .NET Framework will see Task&lt;TResult&gt; implement IObservable&lt;T&gt;, we can get the relevant behavior now by implementing it as an extension method for Task&lt;TResult&gt;; in fact, Rx includes just such an extension method, as does ParallelExtensionsExtras.<span>&nbsp; <\/span><\/p>\n<p><\/font><\/font><\/p>\n<p class=\"MsoNormal\"><font size=\"3\" face=\"Calibri\">Here we&rsquo;ll take a look at the implementation available in <\/font><a href=\"https:\/\/code.msdn.microsoft.com\/ParExtSamples\"><font size=\"3\" face=\"Calibri\">ParallelExtensionsExtras<\/font><\/a><font size=\"3\"><font face=\"Calibri\">, as part of the TaskExtrasExtensions.cs file.<span>&nbsp; <\/span>I&rsquo;ve omitted parameter validation for the sake of conciseness. <span>&nbsp;&nbsp;<\/span>With such an extension method, we can rewrite the LINQ query from our last post to instead be based on observables, e.g.<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">IObservable&lt;string&gt; result = from x in Task.Factory.StartNew( <\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>() =&gt; ProduceInt()).ToObservable()<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>from y in Task.Factory.StartNew(<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/span>() =&gt; Process(x)).ToObservable()<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>select y.ToString();<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">First, we need our ToObservable extension method itself, which should accept a Task&lt;TResult&gt; and return an IObservable&lt;TResult&gt;.<span>&nbsp; <\/span>To do that, we need a type that implements IObservable&lt;TResult&gt; and wraps the Task&lt;TResult&gt;, so that we can work with the Task&lt;TResult&gt; when Subscribe is called on the IObservable&lt;TResult&gt;.<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">public static IObservable&lt;TResult&gt; ToObservable&lt;TResult&gt;(<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>this Task&lt;TResult&gt; task)<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>return new TaskObservable&lt;TResult&gt; { _task = task };<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">Now we just need to implement TaskObservable&lt;TResult&gt;, which implements IObservable&lt;TResult&gt; and its one method: Subscribe.<span>&nbsp; <\/span>When Subscribe is called to register an IObserver&lt;T&gt;, we&rsquo;ll take advantage of ContinueWith to get a callback when the task completes.<span>&nbsp; <\/span>If the&nbsp;task completed successfully, we&rsquo;ll pass along its Result to the observer&rsquo;s OnNext, and then notify the observer through OnCompleted that the observable will not be sending out any more values.<span>&nbsp; <\/span>If the task failed, we&rsquo;ll pass along its Exception the observer through its OnError method.<span>&nbsp; <\/span>And if the task was canceled, we&rsquo;ll pass along a TaskCanceledException (which derives from OperationCanceledException).<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">That&rsquo;s the bulk of the implementation, shown here:<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">private sealed class TaskObservable&lt;TResult&gt; : IObservable&lt;TResult&gt;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>internal Task&lt;TResult&gt; _task;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>public IDisposable Subscribe(IObserver&lt;TResult&gt; observer)<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>var cts = new CancellationTokenSource();<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>_task.ContinueWith(t =&gt;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>switch (t.Status)<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>{<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>case TaskStatus.RanToCompletion:<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>observer.OnNext(_task.Result);<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>observer.OnCompleted();<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>break;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>case TaskStatus.Faulted:<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>observer.OnError(_task.Exception);<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>break;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>case TaskStatus.Canceled:<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span><span>&nbsp;&nbsp;<\/span>observer.OnError(new TaskCanceledException(t));<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>break;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>}, cts.Token);<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/span>return new CancelOnDispose { Source = cts };<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><span>&nbsp;&nbsp;&nbsp; <\/span>}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">}<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">There&rsquo;s one piece I have not described, and that&rsquo;s dealing with unsubscription.<span>&nbsp; <\/span>The Subscribe call returns an IDisposable that can be used to cancel the observer&rsquo;s subscription, effectively unsubscribing the observer.<span>&nbsp; <\/span>We handle that with task cancellation.<span>&nbsp; <\/span>The IDisposable we return from Subscribe is just a simple wrapper around a CancellationTokenSource, such that the source will have cancellation requested when the object is disposed (e.g. Dispose() { _source.Cancel(); }).<span>&nbsp; <\/span>This source&rsquo;s CancellationToken is provided to the ContinueWith method, such that if the subscription is canceled, so too is the continuation.<\/p>\n<p><\/font><\/font><\/p><\/p>\n","protected":false},"excerpt":{"rendered":"<p>(The full set of ParallelExtensionsExtras Tour posts is available&nbsp;here.)&nbsp;&nbsp; In our previous ParallelExtensionsExtras tour post, we discussed a custom implementation of the LINQ operators, in particular for working with Task&lt;TResult&gt; instances in an asynchronous manner. There is already an impressive implementation of the LINQ operators and more for working with asynchronous operations: Reactive Extensions (Rx).&nbsp; [&hellip;]<\/p>\n","protected":false},"author":360,"featured_media":58792,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[7908],"tags":[7907,7911,7909,7924,7912],"class_list":["post-55932","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-pfxteam","tag-net-4","tag-code-samples","tag-parallel-extensions","tag-parallelextensionsextras","tag-task-parallel-library"],"acf":[],"blog_post_summary":"<p>(The full set of ParallelExtensionsExtras Tour posts is available&nbsp;here.)&nbsp;&nbsp; In our previous ParallelExtensionsExtras tour post, we discussed a custom implementation of the LINQ operators, in particular for working with Task&lt;TResult&gt; instances in an asynchronous manner. There is already an impressive implementation of the LINQ operators and more for working with asynchronous operations: Reactive Extensions (Rx).&nbsp; [&hellip;]<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/posts\/55932","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/users\/360"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/comments?post=55932"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/posts\/55932\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/media\/58792"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/media?parent=55932"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/categories?post=55932"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/tags?post=55932"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}