{"id":400,"date":"2020-03-05T07:21:47","date_gmt":"2020-03-05T15:21:47","guid":{"rendered":"http:\/\/devblogs.microsoft.com\/cosmosdb\/?p=400"},"modified":"2020-04-20T11:42:02","modified_gmt":"2020-04-20T18:42:02","slug":"migrating-relational-data-into-cosmos-db-using-azure-data-factory-and-azure-databricks","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/cosmosdb\/migrating-relational-data-into-cosmos-db-using-azure-data-factory-and-azure-databricks\/","title":{"rendered":"Migrating Relational Data into Azure Cosmos DB using Azure Data Factory and Azure Databricks"},"content":{"rendered":"<p>Organizations migrating relational data to <a href=\"http:\/\/www.AzureCosmosDB.com\">Azure Cosmos DB<\/a> meet different challenges, from moving large amounts of data, to performing the transformations required to properly store the data in a format that will provide the performance required.\nThe first step on this type of migrations is to come up with the non-relational model that will accommodate all the relational data and support existing workload characteristics. Luckily, there are <a href=\"https:\/\/docs.microsoft.com\/en-us\/azure\/cosmos-db\/migrate-relational-to-cosmos-db-sql-api#azure-data-factory\">good guidelines<\/a> on how to approach this modeling effort.\nThere are also good examples on how to perform the transformation using different approaches. I used <a href=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/migrating-relational-data-with-one-to-few-relationships-into-azure-cosmos-db-sql-api\/\">Theo\u2019s<\/a> as inspiration for mine. My approach leverages Azure Data Factory to perform efficient parallel copying and PySpark to perform the required transformations at scale.<\/p>\n<h2>One-To-Many Relationships using the Embedding Approach<\/h2>\n<p>In some One-to-Many scenarios, the recommended approach is to Embed the many side into the one side, thus eliminating the need for joins. A common example is when we have a master\/detail pair of tables like Order Header and Order Detail.<\/p>\n<p><a href=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailresults-scaled.jpg\"><img decoding=\"async\" class=\"aligncenter size-large wp-image-411\" src=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailresults-1024x366.jpg\" alt=\"Order Header and Order Detail results\" width=\"640\" height=\"229\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailresults-1024x366.jpg 1024w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailresults-300x107.jpg 300w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailresults-768x274.jpg 768w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailresults-1536x549.jpg 1536w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailresults-scaled.jpg 2048w\" sizes=\"(max-width: 640px) 100vw, 640px\" \/><\/a><\/p>\n<p>Here we have one record for the Order Header and three corresponding records for the Order Detail. In a relational world, we are required to join these two tables (by SalesOrderID) to get a complete picture of sales data. When using the embedded approach to migrate this data to an Azure Cosmos DB (Core SQL API), the data will look like a single document with data for the order, and an array of elements representing data for the detail..<\/p>\n<p><a href=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsDocument.jpg\"><img decoding=\"async\" class=\"aligncenter wp-image-412\" src=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsDocument-654x1024.jpg\" alt=\"Order Header and Order Detail As Document\" width=\"400\" height=\"627\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsDocument-654x1024.jpg 654w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsDocument-192x300.jpg 192w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsDocument-768x1203.jpg 768w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsDocument-981x1536.jpg 981w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsDocument.jpg 1045w\" sizes=\"(max-width: 400px) 100vw, 400px\" \/><\/a><\/p>\n<p>Notice that I left the SalesOrderID element on the embedded documents just for reference. The final implementation will remove these elements as they are not necessary anymore.<\/p>\n<p>&nbsp;<\/p>\n<h2>The Solution<\/h2>\n<p>The solution has a single ADF Pipeline with three activities, one to bring the relational data to ADLS, another one to transform the data, and a final one to load the data into Azure Cosmos DB.<\/p>\n<p><a href=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/ADFSQLtoNoSQLUsingDatabricksPipeline.jpg\"><img decoding=\"async\" class=\"aligncenter size-full wp-image-422\" src=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/ADFSQLtoNoSQLUsingDatabricksPipeline.jpg\" alt=\"ADF SQL to NoSQL Using Databricks Pipeline\" width=\"723\" height=\"165\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/ADFSQLtoNoSQLUsingDatabricksPipeline.jpg 723w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/ADFSQLtoNoSQLUsingDatabricksPipeline-300x68.jpg 300w\" sizes=\"(max-width: 723px) 100vw, 723px\" \/><\/a><\/p>\n<h3>Step 1: Copy Data from Relational Sources to ADLS Gen 2<\/h3>\n<p>The first step uses <a href=\"https:\/\/docs.microsoft.com\/en-us\/azure\/data-factory\/copy-activity-overview\">Azure Data Factory (ADF) Copy activity<\/a> to copy the data from its original relational sources to a staging file system in Azure Data Lake Storage (ADLS) Gen 2. I choose ADF copy activity because it allows me to source data from a large and increasingly growing number of sources in a secure, reliable, and scalable way. ADF is also used to orchestrate the entire pipeline as it provides monitoring capabilities.\nA single ForEach activity loops thru a Copy Activity to move the necessary datasets from the relational source. This can also be accomplished using two separated Copy activities. This example uses Azure SQL Database as relational data source.\nI used Parquet as the destination file format to keep the incoming data types and speed up PySpark processing downstream<\/p>\n<p><a href=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/ADFCopySQLDatatoADLS.jpg\"><img decoding=\"async\" class=\"aligncenter size-full wp-image-413\" src=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/ADFCopySQLDatatoADLS.jpg\" alt=\"ADF Copy SQL Data to ADLS\" width=\"914\" height=\"566\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/ADFCopySQLDatatoADLS.jpg 914w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/ADFCopySQLDatatoADLS-300x186.jpg 300w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/ADFCopySQLDatatoADLS-768x476.jpg 768w\" sizes=\"(max-width: 914px) 100vw, 914px\" \/><\/a><\/p>\n<p>At the end of this step, we will have two files on ADLS, one representing Sales Order Header, and another resenting Sales Order Detail.<\/p>\n<h3>Step 2: Transform Relational to Non-relational using Embedded approach<\/h3>\n<p>Now that we have our files in ADLS, we can use Azure Databricks to code a PySpark process to perform the required transformations. We need to merge the two incoming master\/detail datasets into a single collection of documents.\nBefore you continue, make sure you have follow the steps required to grant your Databricks cluster access to the ADLS Gen2 filesystem. You can find those detailed steps <a href=\"https:\/\/docs.databricks.com\/data\/data-sources\/azure\/azure-datalake-gen2.html\">here<\/a>. For this example, I&#8217;ve mounted the ADLS folder into a <strong>\/mnt\/staging<\/strong> path.\nI used a single Notebook that is called from ADF using the Azure Databricks Notebook activity. First, we read the two parquet files into Dataframes:<\/p>\n<pre class=\"theme:neon lang:python decode:true\" title=\"Load Order Header and Order Detail to Dataframes\">orderHeaderDF = spark.read.parquet('\/mnt\/staging\/SalesOrderHeader.parquet')\r\norderDetailDF = spark.read.parquet('\/mnt\/staging\/SalesOrderDetail.parquet')\r\n<\/pre>\n<p>Since we used Parquet, Spark automatically recognize the structure and data types on each dataset. Next, we transform the Order Detail Dataframe so it is ready to use as embedded document. The code first create a new structure element using all the elements (attributes) from the Order Detail Dataframe (except SalesOrderID). Then it groups the records by SalesOrderID, collecting all the records with the same SalesOrderID into an Array. This will reduce the granularity of the original Sales Order Detail Dataframe.<\/p>\n<pre class=\"theme:neon lang:python decode:true\" title=\"Embed Order Detail records into array, grouping by OrderSalesID\">from pyspark.sql.functions import *\r\n\r\nembeddedElementName = 'Details'\r\njoinColumnName = 'SalesOrderID'\r\norderDetailEmbeddedDF = (orderDetailDF.withColumn(embeddedElementName\r\n         ,(struct([col(colName) for colName in orderDetailDF.columns if colName != joinColumnName ])))\r\n         .select(joinColumnName,embeddedElementName)\r\n         .groupby(joinColumnName)\r\n         .agg(collect_list(embeddedElementName).alias(embeddedElementName)))<\/pre>\n<p>Once the transformation is completed, the new Order Detail Dataframe has the right structure to be used in the final step. Noticed that the column used for aggregations is the same I will use to join back to the Sales Order Header Dataframe (SalesOrderID).<\/p>\n<p><a href=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/SparkEmbeddingStructure.jpg\"><img decoding=\"async\" class=\"aligncenter size-full wp-image-427\" src=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/SparkEmbeddingStructure.jpg\" alt=\"Spark Embedding Structure\" width=\"797\" height=\"223\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/SparkEmbeddingStructure.jpg 797w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/SparkEmbeddingStructure-300x84.jpg 300w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/SparkEmbeddingStructure-768x215.jpg 768w\" sizes=\"(max-width: 797px) 100vw, 797px\" \/><\/a><\/p>\n<p>The final transformation first joins Order Header with the newly formatted Order Detail Dataframe using SalesOrderID as join condition. Then it selects all the element from the Order Header Dataframe and only the Details element from the new Order Detail Dataframe as we don&#8217;t need two copies of SalesOrderID in the same document.<\/p>\n<pre class=\"theme:neon lang:python decode:true \" title=\"Join OrderHeader with OrderDetailEmbedded usiong SalesOrderID - Select only Details Column from OrderDetail \">from pyspark.sql.functions import col\r\n\r\norderCombinedDF = (orderHeaderDF.alias('OH')\r\n                   .join(orderDetailEmbeddedDF.alias('OD')\r\n                         ,col('OH.'+ joinColumnName) == col('OD.' + joinColumnName))\r\n                   .select([col('OH.'+colName) for colName in orderHeaderDF.columns] \r\n                           + [col('OD.' + embeddedElementName)]) )<\/pre>\n<p>As you can see, the code is dynamic and will work with any number of columns in both Header and Source. The final Dataframe, ready to be loaded to Cosmos DB, is written to a JSON file on ADLS. its final schema looks like this:<\/p>\n<p><a href=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsCombinedDocument.jpg\"><img decoding=\"async\" class=\"aligncenter size-full wp-image-431\" src=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsCombinedDocument.jpg\" alt=\"Order Header and Order Detail As Combined Document\" width=\"388\" height=\"609\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsCombinedDocument.jpg 388w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsCombinedDocument-191x300.jpg 191w\" sizes=\"(max-width: 388px) 100vw, 388px\" \/><\/a><\/p>\n<p>&nbsp;<\/p>\n<h3>Step 3: Load the Transformed Data into Azure Cosmos DB<\/h3>\n<p>Finally, I use an ADF Copy Activity to load the JSON file created on the previous step into an Azure Cosmos DB collection. There is no need to specify any mapping. Once loaded into the colleciton, documents will look like this from Azure Cosmos DB Data Explorer<\/p>\n<p><a href=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsCombinedDocumentinCosmosDB.jpg\"><img decoding=\"async\" class=\"aligncenter size-large wp-image-433\" src=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsCombinedDocumentinCosmosDB-1024x840.jpg\" alt=\"Order Header and Order Detail As Combined Document in Cosmos DB\" width=\"640\" height=\"525\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsCombinedDocumentinCosmosDB-1024x840.jpg 1024w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsCombinedDocumentinCosmosDB-300x246.jpg 300w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsCombinedDocumentinCosmosDB-768x630.jpg 768w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/03\/OrderHeaderandOrderDetailAsCombinedDocumentinCosmosDB.jpg 1034w\" sizes=\"(max-width: 640px) 100vw, 640px\" \/><\/a><\/p>\n<p>&nbsp;<\/p>\n<h2>Implementation Notes<\/h2>\n<p>The Spark code is not completely generic. However, it should be relatively easy to add parameters to the notebook to include the input and output files, and the names of the fields use for joins and embedding.<\/p>\n<p>The Spark code is short and could eventually be replaced with a native Azure Data Factory Mapping Data Flow operator, providing a simpler and easier to maintain solution.<\/p>\n<p>I used Azure Databricks to run the PySpark code and Azure Data Factory to copy data and orchestrate the entire process. Once available, this could be accomplished by using only Azure Synapse.<\/p>\n<h2><span style=\"font-size: 14pt;\"><strong>Get started<\/strong><\/span><\/h2>\n<p>Create a new account using the Azure Portal, ARM template or Azure CLI and connect to it using your favorite tools. Stay up-to-date on the latest Azure\u202f<a href=\"https:\/\/twitter.com\/search?q=%23cosmosdb\">#CosmosDB<\/a>\u202fnews and features by following us on Twitter\u202f<a href=\"https:\/\/twitter.com\/azurecosmosdb\">@AzureCosmosDB<\/a>. We are really excited to see what you will build with Azure Cosmos DB!<\/p>\n<h2><span style=\"font-size: 14pt;\">About Azure Cosmos DB<\/span><\/h2>\n<p><a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/introduction\">Azure Cosmos DB<\/a> is a globally distributed, multi-model database service that enables you to read and write data from any Azure region. It offers <a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/distribute-data-globally\">turnkey global distribution<\/a>, guarantees\u00a0<a href=\"https:\/\/azure.microsoft.com\/support\/legal\/sla\/cosmos-db\/v1_3\/\">single-digit millisecond<\/a> latency at the 99<sup>th<\/sup>\u00a0percentile, 99.999 percent\u00a0<a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/high-availability\">high availability<\/a>, with <a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/scaling-throughput\">elastic scaling<\/a>\u00a0of\u00a0<a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/request-units\">throughput and storage<\/a>.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Organizations migrating relational data to Azure Cosmos DB meet different challenges, from moving large amounts of data, to performing the transformations required to properly store the data in a format that will provide the performance required. The first step on this type of migrations is to come up with the non-relational model that will accommodate [&hellip;]<\/p>\n","protected":false},"author":20289,"featured_media":404,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[19],"tags":[289,287,344,291,286,293,292,285,290],"class_list":["post-400","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-tips-and-tricks","tag-azure","tag-cosmos-db","tag-data-factory","tag-databricks","tag-migration","tag-one-to-few","tag-pyspark","tag-relational","tag-spark"],"acf":[],"blog_post_summary":"<p>Organizations migrating relational data to Azure Cosmos DB meet different challenges, from moving large amounts of data, to performing the transformations required to properly store the data in a format that will provide the performance required. The first step on this type of migrations is to come up with the non-relational model that will accommodate [&hellip;]<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/posts\/400","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/users\/20289"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/comments?post=400"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/posts\/400\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/media\/404"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/media?parent=400"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/categories?post=400"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/tags?post=400"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}