{"id":55920,"date":"2010-04-14T09:10:00","date_gmt":"2010-04-14T09:10:00","guid":{"rendered":"https:\/\/blogs.msdn.microsoft.com\/pfxteam\/2010\/04\/14\/parallelextensionsextras-tour-10-pipeline\/"},"modified":"2010-04-14T09:10:00","modified_gmt":"2010-04-14T09:10:00","slug":"parallelextensionsextras-tour-10-pipeline","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/dotnet\/parallelextensionsextras-tour-10-pipeline\/","title":{"rendered":"ParallelExtensionsExtras Tour &#8211; #10 &#8211; Pipeline"},"content":{"rendered":"<p><font size=\"3\"><font face=\"Calibri\"><\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\"><em>(The full set of ParallelExtensionsExtras Tour posts is available&nbsp;<\/em><a href=\"https:\/\/blogs.msdn.com\/pfxteam\/archive\/2010\/04\/04\/9990342.aspx\"><font color=\"#dd4a21\"><em>here<\/em><\/font><\/a><em>.)<\/em>&nbsp;<\/font><\/font><\/p>\n<p class=\"MsoNormal\">Producer\/consumer is a fundamental <a href=\"https:\/\/www.microsoft.com\/downloads\/details.aspx?FamilyID=86b3d32b-ad26-4bb8-a3ae-c1637026c3ee&amp;displaylang=en\">pattern<\/a> employed in many parallel applications.<span>&nbsp; <\/span>With producer\/consumer, one or more producer threads generate data that is consumed by one or more consumer threads.<span>&nbsp; <\/span>These consumers can themselves also be producers of further data, typically based on the data they were consuming, which is then consumed again by one or more consumers.<span>&nbsp; <\/span>And so on. This forms a pipeline, where each level of consumers represents another &ldquo;stage&rdquo; in the pipeline, like workers on an assembly line.<\/p>\n<p><\/font><\/p>\n<p class=\"MsoNormal\"><font size=\"3\"><font face=\"Calibri\">Consider an algorithm that will compress and then encrypt blocks of data.<span>&nbsp; <\/span>The nature of the employed algorithms is such that there is carry-over data from one block to the next, which means we must compress and encrypt blocks in order.<span>&nbsp; <\/span>We can&rsquo;t run the compression and encryption algorithm on independent blocks concurrently, but we can run the compression concurrently with the encryption, where while we&rsquo;re encrypting the compressed block N we can be compressing block N+1.<span>&nbsp; <\/span>This is naturally modeled as a pipeline, e.g.:<\/p>\n<p><\/font><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">var compressAndEncrypt = <\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">&nbsp;&nbsp;&nbsp; Pipeline.Create(rawChunk =&gt; Compress(rawChunk))<\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .Next(compressedChunk =&gt; Encrypt(compressedChunk));<\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\"><\/p>\n<p><\/font>&nbsp;<\/p>\n<p class=\"Code\"><font face=\"Consolas\">IEnumerable&lt;byte[]&gt; inputChunks= &#8230;;<\/p>\n<p><\/font><\/p>\n<p class=\"Code\"><font face=\"Consolas\">IEnumerable&lt;byte[]&gt; results = <br>&nbsp;&nbsp;&nbsp; compressAndEncrypt.Process(inputChunks);<\/p>\n<p><\/font><\/p>\n<p class=\"Code\">\n<p><font face=\"Consolas\">&nbsp;<\/font><\/p>\n<\/p>\n<p class=\"MsoNormal\"><font size=\"3\" face=\"Calibri\">There are many ways pipelines can be implemented, and in some cases, you want to dedicate one or more threads to processing each stage of the pipeline.<span>&nbsp; <\/span>The Pipeline class in the Pipeline.cs file in <a href=\"https:\/\/code.msdn.microsoft.com\/ParExtSamples\">ParallelExtensionsExtras<\/a> provides this functionality, delivering a simple API on top of relatively powerful functionality.<\/p>\n<p>&nbsp;<\/font><font size=\"3\" face=\"Calibri\">Pipeline uses a Task to represent each stage of the pipeline.<span>&nbsp; <\/span>This Task is executed on a <\/font><a href=\"https:\/\/blogs.msdn.com\/pfxteam\/archive\/2010\/04\/09\/9990424.aspx\"><font size=\"3\" face=\"Calibri\">ThreadPerTaskScheduler<\/font><\/a><font size=\"3\"><font face=\"Calibri\"> so that each stage gets its own dedicated thread.<span>&nbsp; <\/span>Further, each stage supports a configurable degree of parallelism, such that multiple dedicated threads may be used to process each stage.<span>&nbsp; <\/span>To handle this, the Task uses a Parallel.ForEach to process the input enumerable to that stage, where the Parallel.ForEach is also targeted to the ThreadPerTaskScheduler instance.<span>&nbsp; <\/span>An instance of BlockingCollection&lt;T&gt; is employed between each stage as the channel for passing data between the producer and the consumer, and the entire pipeline is cancelable with a CancellationToken.<\/font><\/font><\/p>\n<p><\/font><\/p>\n","protected":false},"excerpt":{"rendered":"<p>(The full set of ParallelExtensionsExtras Tour posts is available&nbsp;here.)&nbsp; Producer\/consumer is a fundamental pattern employed in many parallel applications.&nbsp; With producer\/consumer, one or more producer threads generate data that is consumed by one or more consumer threads.&nbsp; These consumers can themselves also be producers of further data, typically based on the data they were consuming, [&hellip;]<\/p>\n","protected":false},"author":360,"featured_media":58792,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[7908],"tags":[7907,7911,7909,7924],"class_list":["post-55920","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-pfxteam","tag-net-4","tag-code-samples","tag-parallel-extensions","tag-parallelextensionsextras"],"acf":[],"blog_post_summary":"<p>(The full set of ParallelExtensionsExtras Tour posts is available&nbsp;here.)&nbsp; Producer\/consumer is a fundamental pattern employed in many parallel applications.&nbsp; With producer\/consumer, one or more producer threads generate data that is consumed by one or more consumer threads.&nbsp; These consumers can themselves also be producers of further data, typically based on the data they were consuming, [&hellip;]<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/posts\/55920","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/users\/360"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/comments?post=55920"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/posts\/55920\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/media\/58792"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/media?parent=55920"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/categories?post=55920"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/dotnet\/wp-json\/wp\/v2\/tags?post=55920"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}