{"id":9978,"date":"2019-01-18T11:39:57","date_gmt":"2019-01-18T19:39:57","guid":{"rendered":"https:\/\/www.microsoft.com\/developerblog\/?p=9978"},"modified":"2020-03-19T12:42:58","modified_gmt":"2020-03-19T19:42:58","slug":"running-parallel-apache-spark-notebook-workloads-on-azure-databricks","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/ise\/running-parallel-apache-spark-notebook-workloads-on-azure-databricks\/","title":{"rendered":"Running Parallel Apache Spark Notebook Workloads On Azure Databricks"},"content":{"rendered":"<h2>Background and Problem Definition<\/h2>\n<p>In today\u2019s fast-moving world, having access to up-to-date business metrics is key to making data-driven customer-centric decisions. With over 1000 daily flights servicing more than 100 cities and 42 million customers per year, JetBlue has a lot of data to crunch, answering questions such as: What is the utilization of a given\u00a0route? What is the projected load of a flight? How many flights were on-time? What is the idle time of each plane model at a given airport? To provide decision-makers answers to these and other inquiries in a timely fashion, JetBlue partnered with Microsoft to develop a flexible and extensible reporting solution based on <a href=\"https:\/\/spark.apache.org\/docs\/2.4.0\/\">Apache Spark<\/a> and <a href=\"https:\/\/azure.microsoft.com\/en-us\/services\/databricks\/\">Azure Databricks<\/a>.<\/p>\n<p>A key data source for JetBlue is a recurring batch file which lists all customer bookings created or changed during the last batch period. For example, a batch file on January 10\u00a0may include a newly created future booking for February 2, an upgrade to a reservation for a flight on March 5,\u00a0or a listing of customers who flew on all flights on January 10. To keep business metrics fresh, each batch file must result in the re-computation of the metrics for each day listed in the file. This poses an interesting scaling challenge for the Spark job computing the metrics: how do we keep the metrics production code simple and readable while still being able to re-process metrics for hundreds of days in a timely fashion?<\/p>\n<p>The remainder of this article will walk through various scaling techniques for dealing with scenarios that require large numbers of Spark jobs to be run on Azure Databricks and present solutions that were\u00a0able to reduce processing times by over 60% compared to our initial solution.<\/p>\n<h2>Developing a Solution<\/h2>\n<p>At the outset of the project, we had two key solution constraints: time and simplicity. First, to aid with maintainability and onboarding, all Spark code should be simple and easily understandable even to novices in the technology. Second, to keep business metrics relevant for JetBlue decision-makers, all re-computations should terminate within a few minutes. These two constraints were immediately at odds: a natural way to scale jobs in Spark is to leverage partitioning and operate on larger batches of data in one go; however, this complicates code understanding and performance tuning since developers must be familiar with partitioning, balancing data across partitions, etc. To keep the code as straightforward as possible, we therefore wanted to implement the business metrics Spark jobs in a direct and easy-to-follow way, and to have a single parameterized Spark job that computes the metrics for a given\u00a0booking day.<\/p>\n<h3>Cluster Size and Spark Job Processing Time<\/h3>\n<p>After implementing the business metrics Spark job with JetBlue, we immediately faced a scaling concern. For many Spark jobs, including JetBlue\u2019s, there is a ceiling on the speed-ups that can be gained by simply\u00a0adding more workers to the Spark cluster: past a certain point, adding more workers won\u2019t significantly decrease processing times. This is due to added communication overheads or simply\u00a0because there is not enough natural partitioning in the data to enable efficient distributed processing.<\/p>\n<p>Figure 1 below demonstrates the aforementioned cluster-size related Spark scaling limit with the example of a simple word-count job. The code for the job can be found in the Resources section below. The graph clearly\u00a0shows that\u00a0we encounter diminishing returns after adding only 5 machines to the cluster; and past a cluster size of 15 machines, adding more machines to the cluster won\u2019t speed up the job.<\/p>\n<p>After using cluster size to scale JetBlue&#8217;s business metrics Spark job, we came to an unfortunate realization. It would take several hours to re-process the daily metrics. This was unacceptable.<\/p>\n<p><figure id=\"attachment_9981\" aria-labelledby=\"figcaption_attachment_9981\" class=\"wp-caption aligncenter\" ><img decoding=\"async\" class=\"aligncenter size-full wp-image-10579\" src=\"https:\/\/devblogs.microsoft.com\/cse\/wp-content\/uploads\/sites\/55\/2019\/01\/databricks-processing-time-cluster-size-scaling-limit.png\" alt=\"Image databricks processing time cluster size scaling limit\" width=\"481\" height=\"384\" srcset=\"https:\/\/devblogs.microsoft.com\/ise\/wp-content\/uploads\/sites\/55\/2019\/01\/databricks-processing-time-cluster-size-scaling-limit.png 481w, https:\/\/devblogs.microsoft.com\/ise\/wp-content\/uploads\/sites\/55\/2019\/01\/databricks-processing-time-cluster-size-scaling-limit-300x240.png 300w\" sizes=\"(max-width: 481px) 100vw, 481px\" \/><figcaption id=\"figcaption_attachment_9981\" class=\"wp-caption-text\"><br \/>Figure 1: Processing time versus cluster size of a simple word-count Spark job. We note that past a specific cluster size, adding more machines to a job doesn&#8217;t speed up the runtime anymore.<\/figcaption><\/figure><\/p>\n<h3>Parallel Execution of Spark Jobs on Azure Databricks<\/h3>\n<p>We noticed that JetBlue\u2019s business metrics Spark job is highly parallelizable: each day can be processed completely independently. However, using separate Databricks clusters to run JetBlue\u2019s business metrics Spark job on days in parallel was not desirable\u00a0\u2013\u00a0having to deploy and monitor code in multiple execution environments would result in a large operational and tooling burden. We therefore reformulated the problem\u00a0as such: was there a way in which we could run JetBlue\u2019s jobs in parallel on the same cluster?<\/p>\n<h4>The Driver Notebook Pattern in Azure Databricks<\/h4>\n<p>Azure Databricks offers a mechanism to run sub-jobs from within a job via the <a href=\"https:\/\/docs.databricks.com\/user-guide\/notebooks\/notebook-workflows.html#run-usage\">dbutils.notebook.run<\/a> API. A simple usage of the API is\u00a0as follows:<\/p>\n<pre class=\"lang:scala decode:true\">\/\/ define some\u00a0way to generate a sequence of workloads to run\r\nval jobArguments =\u00a0???\r\n\r\n\/\/ define the name of the Azure Databricks notebook to run\r\nval notebookToRun =\u00a0???\r\n\r\n\/\/ start the jobs\r\njobArguments.foreach(args =&gt;\r\n  dbutils.notebook.run(notebookToRun, timeoutSeconds = 0, args))<\/pre>\n<p>Using the dbutils.notebooks.run API, we were able to keep JetBlue\u2019s main business metrics Spark job simple: the job only needs to concern itself with processing the metrics for a single day. We then created a separate \u201cdriver\u201d Spark job that manages the complexity of running the metrics job for all the requisite days. In this way we were able to hide the complexity of scheduling for performance from the business logic. This fulfilled our code simplicity goal with JetBlue.<\/p>\n<h4>Using Scala Parallel Collections to Run Parallel Spark Jobs<\/h4>\n<p>Upon further investigation, we learned that the run method is a blocking call. This essentially\u00a0means that the implementation is equivalent to running all the jobs in sequence, thus leading back to the previously experienced performance concerns. To achieve parallelism for JetBlue\u2019s workload, we next attempted to leverage Scala\u2019s parallel collections to launch the jobs:<\/p>\n<pre class=\"lang:scala decode:true\">\/\/ define some\u00a0way to generate a sequence of workloads to run\r\nval jobArguments =\u00a0???\r\n\r\n\/\/ define the name of the Azure Databricks notebook to run\r\nval notebookToRun =\u00a0???\r\n\r\n\/\/ look up required context for parallel run calls\r\nval context = dbutils.notebook.getContext()\r\n\r\njobArguments.par.foreach(args =&gt; {\r\n  \/\/ ensure thread knows about databricks context\r\n  dbutils.notebook.setContext(context)\r\n  \/\/ start the job\r\n  dbutils.notebook.run(notebookToRun, timeoutSeconds = 0, args)\r\n})<\/pre>\n<p>Figure 3 at the end of this section shows that the parallel collections approach does offer some\u00a0performance benefits over running the workloads in sequence. However, we discovered that there are two factors limiting the parallelism of this implementation.<\/p>\n<p>First, Scala parallel collections will, by default, only use as many threads as there are cores available on the Spark driver machine. This means that if we use a cluster of DS3v2 nodes (each with 4 cores) the snippet above will launch at most\u00a04 jobs in parallel. This is undesirable given that the calls are IO-bound instead of CPU-bound and we could thus be supporting many more parallel run invocations.<\/p>\n<p>Additionally, while the code above does\u00a0<em>launch<\/em> Spark jobs in parallel, the Spark scheduler may not actually\u00a0<em>execute<\/em> the jobs in parallel. This is because Spark uses a <a href=\"https:\/\/spark.apache.org\/docs\/latest\/job-scheduling.html\">first-in-first-out scheduling<\/a> strategy by default. The Spark scheduler may attempt to parallelize some\u00a0tasks if there is spare CPU capacity available in the cluster, but this behavior may not optimally utilize the cluster.<\/p>\n<h4>Optimally Using Cluster Resources for Parallel Jobs Via Spark Fair Scheduler Pools<\/h4>\n<p>To further improve the runtime of JetBlue\u2019s parallel workloads, we leveraged the fact\u00a0that at the time of writing with <a href=\"https:\/\/docs.databricks.com\/release-notes\/runtime\/5.0.html\">runtime 5.0<\/a>, Azure Databricks is enabled to make use of Spark <a href=\"https:\/\/spark.apache.org\/docs\/latest\/job-scheduling.html#fair-scheduler-pools\">fair scheduling pools<\/a>. Fair scheduling in Spark means that\u00a0we can define multiple separate resource pools in the cluster which are all available for executing jobs independently. This enabled us to develop the following mechanism to guarantee that Azure Databricks will always execute some configured number of separate notebook-runs in parallel:<\/p>\n<pre class=\"lang:scala decode:true \">\/\/ define some\u00a0way to generate a sequence of workloads to run\r\nval jobArguments =\u00a0???\r\n\r\n\/\/ define the name of the Azure Databricks notebook to run\r\nval notebookToRun =\u00a0???\r\n\r\n\/\/ define maximum number of jobs to run in parallel\r\nval totalJobs =\u00a0???\r\n\r\nimport java.util.concurrent.Executors\r\nimport scala.concurrent.{Await, ExecutionContext, Future}\r\nimport scala.concurrent.duration.Duration\r\n\r\n\/\/ look up required context for parallel run calls\r\nval context = dbutils.notebook.getContext()\r\n\r\n\/\/ create threadpool for parallel runs\r\nimplicit val executionContext = ExecutionContext.fromExecutorService(\r\n  Executors.newFixedThreadPool(totalJobs))\r\n\r\ntry {\r\n  val futures = jobArguments.zipWithIndex.map { case (args, i) =&gt;\r\n    Future({\r\n      \/\/ ensure thread knows about databricks context\r\n      dbutils.notebook.setContext(context)\r\n\r\n      \/\/ define up to maxJobs separate scheduler pools\r\n      sc.setLocalProperty(\"spark.scheduler.pool\", s\"pool${i\u00a0% totalJobs}\")\r\n\r\n      \/\/ start the job in the scheduler pool\r\n      dbutils.notebook.run(notebookToRun, timeoutSeconds = 0, args)\r\n    })}\r\n\r\n  \/\/ wait for all the jobs to finish processing\r\n  Await.result(Future.sequence(futures), atMost = Duration.Inf)\r\n} finally {\r\n  \/\/ ensure to clean up the threadpool\r\n  executionContext.shutdownNow()\r\n}<\/pre>\n<p>The code above is somewhat\u00a0more involved than the parallel collections approach but offers two key benefits. Firstly, the use of a dedicated threadpool guarantees that there are always the configured number of jobs executing in parallel regardless of the number of cores on the Spark driver machine. Additionally, by setting explicit Spark fair scheduling pools for each of the invoked jobs, we were able to guarantee that Spark will truly run the notebooks in parallel on equally sized slices of the cluster.<\/p>\n<p>Using the code above, if we were to create a cluster with 40 workers and set the number of parallel jobs to 4, then each individual job will utilize 10 workers in the cluster. If implemented correctly, the stages tab in the cluster\u2019s Spark UI will look similar to Figure 2 below, which shows that there are 4 concurrently executing sets of Spark tasks on separate scheduler pools in the cluster.<\/p>\n<p><figure id=\"attachment_9980\" aria-labelledby=\"figcaption_attachment_9980\" class=\"wp-caption aligncenter\" ><img decoding=\"async\" class=\"size-large wp-image-9980\" src=\"https:\/\/devblogs.microsoft.com\/cse\/wp-content\/uploads\/sites\/55\/2020\/03\/databricks-parallelism-spark-ui-stages.png\" alt=\"\" width=\"1024\" height=\"436\" \/><figcaption id=\"figcaption_attachment_9980\" class=\"wp-caption-text\">Figure 2: Spark UI in Azure Databricks showing four distinct fair scheduler pools running Spark tasks in parallel (highlighted in orange).<\/figcaption><\/figure><\/p>\n<p>As shown in Figure 3 below, the fair scheduler approach provided great performance improvements. However, determining the optimal number of jobs to run for a given\u00a0workload whenever the cluster size changed would have been a non-trivial time overhead for JetBlue. Instead, we ran a benchmark similar to Figure 1 to\u00a0determine the inflection point after which adding more workers to our Spark job didn&#8217;t improve the processing time anymore. We then were able to use this information to dynamically compute the best number of jobs to run in parallel on our cluster:<\/p>\n<pre class=\"lang:scala decode:true\">\/\/ define the number of workers per job\r\nval workersPerJob =\u00a0???\r\n\r\n\/\/ look up the number of workers in the cluster\r\nval workersAvailable = sc.getExecutorMemoryStatus.size\r\n\r\n\/\/ determine number of jobs we can run each with the desired worker count\r\nval totalJobs = workersAvailable \/ workersPerJob<\/pre>\n<p>This approach worked well for JetBlue since we noticed that all the individual Spark jobs were roughly uniform in processing time and so distributing them evenly among the cluster would lead to optimal performance.<\/p>\n<p>Figure 3 below shows a comparison of the various Spark parallelism approaches described throughout this section. We can see that by using a threadpool, Spark fair scheduler pools, and automatic determination of the number of jobs to run on the cluster, we managed to reduce the runtime to one-third of what it was when running all jobs sequentially on one large cluster.<\/p>\n<p><figure id=\"attachment_9979\" aria-labelledby=\"figcaption_attachment_9979\" class=\"wp-caption aligncenter\" ><img decoding=\"async\" class=\"aligncenter size-full wp-image-10578\" src=\"https:\/\/devblogs.microsoft.com\/cse\/wp-content\/uploads\/sites\/55\/2019\/01\/databricks-parallelism-approaches-processing-time.png\" alt=\"Image databricks parallelism approaches processing time\" width=\"676\" height=\"335\" srcset=\"https:\/\/devblogs.microsoft.com\/ise\/wp-content\/uploads\/sites\/55\/2019\/01\/databricks-parallelism-approaches-processing-time.png 676w, https:\/\/devblogs.microsoft.com\/ise\/wp-content\/uploads\/sites\/55\/2019\/01\/databricks-parallelism-approaches-processing-time-300x149.png 300w\" sizes=\"(max-width: 676px) 100vw, 676px\" \/><figcaption id=\"figcaption_attachment_9979\" class=\"wp-caption-text\"><br \/>Figure 3: Comparison of Spark parallelism techniques. Note that using an approach based on fair scheduler pools enables us to more effectively\u00a0leverage larger clusters for parallel workloads.<\/figcaption><\/figure><\/p>\n<h3>Limitations of Parallel Spark Notebook Tasks<\/h3>\n<p>Note that all code included in the sections above makes use of the dbutils.notebook.run API in Azure Databricks. At the time of writing with the dbutils API at jar version <a href=\"https:\/\/mvnrepository.com\/artifact\/com.databricks\/dbutils-api_2.11\/0.0.3\">dbutils-api 0.0.3<\/a>, the code only works when run in the context of\u00a0an Azure Databricks notebook and will fail to compile if included in a class library jar attached to the cluster.<\/p>\n<p>The best performing approaches described in the previous section require Spark fair scheduler pools to be enabled on your cluster. You can double check that this is the case by executing the following snippet:<\/p>\n<pre class=\"lang:scala decode:true \">\/\/ must return \"FAIR\"\r\nspark.conf.get(\"spark.scheduler.mode\")<\/pre>\n<p>Furthermore, note that while the approaches described in this article do\u00a0make it easy to accelerate Spark workloads on larger cluster sizes by leveraging parallelism, it remains important to keep in mind that for some applications the gains in processing speed may not be worth the increases in cost resulting from the use of larger cluster sizes. The techniques outlined in this article provide us with a tool to trade-off larger cluster sizes for shorter processing times and it&#8217;s up to each specific use-case to determine the optimal balance between urgency and cost. Additionally, we must\u00a0also realize that the speedups resulting from the techniques are not unbounded. For instance, if a Spark jobs read from an external storage\u00a0\u2013\u00a0such as a database or cloud object storage system via HDFS\u00a0\u2013\u00a0eventually the number of concurrent machines reading from the storage may exceed the configured throughput on the external system. This will lead to the jobs slowing down in aggregate.<\/p>\n<h2>Conclusions and Next Steps<\/h2>\n<p>In this article, we presented an approach to run multiple Spark jobs in parallel on an Azure Databricks cluster by leveraging threadpools and Spark fair scheduler pools. This enabled us to reduce the time to compute JetBlue\u2019s business metrics threefold. The approach described in the article can be leveraged to run any notebooks-based workload in parallel on Azure Databricks. We welcome you to give the technique a try and let us know your results in the comments below!<\/p>\n<h3>Resources<\/h3>\n<ul>\n<li><a href=\"https:\/\/1drv.ms\/x\/s!AtLn8ELpA_G9gQJVXXInLQgvVoPI\">Parallel notebook workloads benchmark code and raw results<\/a><\/li>\n<li><a href=\"https:\/\/gist.github.com\/c-w\/5cbf03f9578aaf45f4eb8c04664d24d1\">Github gist for the parallel Spark jobs recipe described in this article<\/a><\/li>\n<li><a href=\"https:\/\/spark.apache.org\/docs\/2.4.0\/job-scheduling.html#scheduling-within-an-application\">Detailed information on Spark fair scheduler pools<\/a><\/li>\n<li><a href=\"https:\/\/docs.databricks.com\/user-guide\/notebooks\/notebook-workflows.html\">More information about Databricks notebook workflows<\/a><\/li>\n<li><a href=\"https:\/\/azure.microsoft.com\/en-us\/services\/databricks\/\">Getting started with Azure Databricks<\/a><\/li>\n<\/ul>\n<p>&nbsp;<\/p>\n","protected":false},"excerpt":{"rendered":"<p>This article walks through the development of a technique for running Spark jobs in parallel on Azure Databricks. The technique enabled us to reduce the processing times for JetBlue&#8217;s reporting threefold while keeping the business logic implementation straight forward. The technique can be re-used for any notebooks-based Spark workload on Azure Databricks.<\/p>\n","protected":false},"author":21408,"featured_media":10580,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[11],"tags":[144,320,333],"class_list":["post-9978","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-big-data","tag-databricks","tag-scala","tag-spark"],"acf":[],"blog_post_summary":"<p>This article walks through the development of a technique for running Spark jobs in parallel on Azure Databricks. The technique enabled us to reduce the processing times for JetBlue&#8217;s reporting threefold while keeping the business logic implementation straight forward. The technique can be re-used for any notebooks-based Spark workload on Azure Databricks.<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/posts\/9978","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/users\/21408"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/comments?post=9978"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/posts\/9978\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/media\/10580"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/media?parent=9978"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/categories?post=9978"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/ise\/wp-json\/wp\/v2\/tags?post=9978"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}