{"id":318,"date":"2019-12-10T03:58:03","date_gmt":"2019-12-10T11:58:03","guid":{"rendered":"http:\/\/devblogs.microsoft.com\/cosmosdb\/?p=318"},"modified":"2019-12-10T03:58:03","modified_gmt":"2019-12-10T11:58:03","slug":"migrating-relational-data-with-one-to-few-relationships-into-azure-cosmos-db-sql-api","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/cosmosdb\/migrating-relational-data-with-one-to-few-relationships-into-azure-cosmos-db-sql-api\/","title":{"rendered":"Migrating Relational Data with one-to-few relationships into Azure Cosmos DB SQL API"},"content":{"rendered":"<p>Migrating relational data into <a href=\"https:\/\/docs.microsoft.com\/en-us\/azure\/cosmos-db\/create-sql-api-dotnet\">Azure Cosmos DB SQL API<\/a> requires certain modelling considerations that differ from relational databases. We discuss the important SQI API modelling concepts in our guidance on <a href=\"https:\/\/docs.microsoft.com\/en-us\/azure\/cosmos-db\/modeling-data\">Data modelling in Azure Cosmos DB<\/a>.<\/p>\n<p>What follows is a sample for migrating data where one-to-few relationships exist (see <a href=\"https:\/\/docs.microsoft.com\/en-us\/azure\/cosmos-db\/modeling-data#when-to-embed\">when to embed data<\/a> in the above guidance). The sample uses the <a href=\"https:\/\/docs.microsoft.com\/en-gb\/azure\/cosmos-db\/spark-connector\">Azure Cosmos DB Spark Connector<\/a>. For more guidance on other migration options, please see <a href=\"https:\/\/docs.microsoft.com\/en-us\/azure\/cosmos-db\/cosmosdb-migrationchoices\">Options to migrate data into Cosmos DB<\/a>.<\/p>\n<p>&nbsp;<\/p>\n<p><span style=\"font-size: 14pt;\"><b>Order and Order Details<\/b><\/span><\/p>\n<p>Here we are considering a simple order system where each order can have multiple detail lines. In this scenario, the relationship is not unbounded, and there is a limited number of detail lines that may exist for a given order. We can consider this a <b>one-to-few<\/b> relationship. This is a good candidate for denormalizion. Typically denormalized data models provide better read performance in distributed databases, since we will minimise the need to read across data partitions.<b><\/b><\/p>\n<p>&nbsp;<\/p>\n<p><img decoding=\"async\" class=\"aligncenter wp-image-319 size-full\" src=\"http:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2019\/12\/orders.png\" alt=\"\" width=\"601\" height=\"376\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2019\/12\/orders.png 601w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2019\/12\/orders-300x188.png 300w\" sizes=\"(max-width: 601px) 100vw, 601px\" \/><\/p>\n<p>&nbsp;<\/p>\n<p>To migrate and merge order details into denormalized documents for each order, we will use <a href=\"https:\/\/azure.microsoft.com\/en-in\/services\/databricks\/\">Azure Databricks<\/a>, an Azure managed service for <a href=\"https:\/\/en.wikipedia.org\/wiki\/Apache_Spark\">Apache Spark<\/a>. To configure the Cosmos DB Spark Connector in Databricks, you can follow the guidance in the Databricks documentation for <a href=\"https:\/\/docs.databricks.com\/data\/data-sources\/azure\/cosmosdb-connector.html\">Azure Cosmos DB<\/a>. When this is done, you are ready to <a href=\"https:\/\/docs.databricks.com\/notebooks\/notebooks-manage.html\">create a new Python Notebook<\/a>. Start by configuring the source and target database connections in the first cell:<\/p>\n<p>&nbsp;<\/p>\n<pre title=\"PySpark connections config\" class=\"theme:github lang:python decode:true \">import uuid\r\nimport pyspark.sql.functions as F\r\nfrom pyspark.sql.functions import col\r\nfrom pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType\r\n\r\n#JDBC connect details for SQL Server database\r\njdbcHostname = \"jdbcHostname\"\r\njdbcDatabase = \"OrdersDB\"\r\njdbcUsername = \"jdbcUsername\"\r\njdbcPassword = \"jdbcPassword\"\r\njdbcPort = \"1433\"\r\n\r\nconnectionProperties = {\r\n  \"user\" : jdbcUsername,\r\n  \"password\" : jdbcPassword,\r\n  \"driver\" : \"com.microsoft.sqlserver.jdbc.SQLServerDriver\"\r\n}\r\njdbcUrl = \"jdbc:sqlserver:\/\/{0}:{1};database={2};user={3};password={4}\".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)\r\n\r\n#Connect details for Target Azure Cosmos DB SQL API account\r\nwriteConfig = {\r\n    \"Endpoint\": \"Endpoint\",\r\n    \"Masterkey\": \"Masterkey\",\r\n    \"Database\": \"OrdersDB\",\r\n    \"Collection\": \"Orders\",\r\n    \"Upsert\": \"true\"\r\n}<\/pre>\n<p>&nbsp;<\/p>\n<p>Next, <a href=\"https:\/\/docs.databricks.com\/notebooks\/notebooks-use.html#add-a-cell\">add a new cell<\/a>. Here, we will query the source Database (in this case SQL Server) for both the order and order detail records, putting the results into <a href=\"https:\/\/databricks.com\/blog\/2015\/02\/17\/introducing-dataframes-in-spark-for-large-scale-data-science.html\">Spark Dataframes<\/a>. We will also create a list containing all the order ids, and a Threadpool for parallel operations:<\/p>\n<p>&nbsp;<\/p>\n<pre title=\"Create Dataframes for source data \" class=\"theme:github lang:python decode:true\">import json\r\nimport ast\r\nimport pyspark.sql.functions as F\r\nimport uuid\r\nimport numpy as np\r\nimport pandas as pd\r\nfrom functools import reduce\r\nfrom pyspark.sql import SQLContext\r\nfrom pyspark.sql.types import *\r\nfrom pyspark.sql import *\r\nfrom pyspark.sql.functions import exp\r\nfrom pyspark.sql.functions import col\r\nfrom pyspark.sql.functions import lit\r\nfrom pyspark.sql.functions import array\r\nfrom pyspark.sql.types import *\r\nfrom multiprocessing.pool import ThreadPool\r\n\r\n#get all orders\r\norders = sqlContext.read.jdbc(url=jdbcUrl, table=\"orders\", properties=connectionProperties)\r\n\r\n#get all order details\r\norderdetails = sqlContext.read.jdbc(url=jdbcUrl, table=\"orderdetails\", properties=connectionProperties)\r\n\r\n#get all OrderId values to pass to map function \r\norderids = orders.select('OrderId').collect()\r\n\r\n#create thread pool big enough to process merge of details to orders in parallel\r\npool = ThreadPool(10)<\/pre>\n<p>&nbsp;<\/p>\n<p>Below this, create a function for writing Orders into the target SQL API collection. This function will filter all order details for the given order id, convert them into a JSON array, and insert the array into a JSON document that we will write into the target SQL API Collection for that order:<\/p>\n<p>&nbsp;<\/p>\n<pre title=\"Write Orders Function\" class=\"theme:github lang:python decode:true\">def writeOrder(orderid):\r\n  #filter the order on current value passed from map function\r\n  order = orders.filter(orders['OrderId'] == orderid[0])\r\n  \r\n  #set id to be a uuid\r\n  order = order.withColumn(\"id\", lit(str(uuid.uuid1())))\r\n  \r\n  #add details field to order dataframe\r\n  order = order.withColumn(\"details\", lit(''))\r\n  \r\n  #filter order details dataframe to get details we want to merge into the order document\r\n  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])\r\n  \r\n  #convert dataframe to pandas\r\n  orderpandas = order.toPandas()\r\n  \r\n  #convert the order dataframe to json and remove enclosing brackets\r\n  orderjson = orderpandas.to_json(orient='records', force_ascii=False)\r\n  orderjson = orderjson[1:-1] \r\n  \r\n  #convert orderjson to a dictionaory so we can set the details element with order details later\r\n  orderjsondata = json.loads(orderjson)\r\n  \r\n  \r\n  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter\r\n  if (orderdetailsgroup.count() !=0):\r\n    #convert orderdetailsgroup to pandas dataframe to work better with json\r\n    orderdetailsgroup = orderdetailsgroup.toPandas()\r\n    \r\n    #convert orderdetailsgroup to json string\r\n    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)\r\n    \r\n    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records\r\n    jsonstring = json.loads(jsonstring)\r\n    \r\n    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order \r\n    orderjsondata['details'] = jsonstring\r\n \r\n  #convert dictionary to json\r\n  orderjsondata = json.dumps(orderjsondata)\r\n\r\n  #read the json into spark dataframe\r\n  df = spark.read.json(sc.parallelize([orderjsondata]))\r\n  \r\n  #write the dataframe (this will be a single order record with merged many-to-one order details) to cosmos db using spark the connector\r\n  #https:\/\/docs.microsoft.com\/en-us\/azure\/cosmos-db\/spark-connector\r\n  df.write.format(\"com.microsoft.azure.cosmosdb.spark\").mode(\"append\").options(**writeConfig).save()<\/pre>\n<p>&nbsp;<\/p>\n<p>Finally, we will call the above using a map function on the thread pool, to execute in parallel, passing in the list of order ids we created earlier:<\/p>\n<p>&nbsp;<\/p>\n<pre title=\"Write Orders to Cosmos DB in parallel\" class=\"theme:github lang:python decode:true \">#map order details to orders in parallel using the above function\r\npool.map(writeOrder, orderids)<\/pre>\n<p>&nbsp;<\/p>\n<p>You should end up with records like the below for each order written to Cosmos DB, containing a JSON array of order details:<\/p>\n<p>&nbsp;<\/p>\n<pre title=\"Denormalized Document Record\" class=\"theme:github lang:js decode:true\">{\r\n    \"OrderId\": 4,\r\n    \"CustomerID\": 123,\r\n    \"EmployeeID\": 1574669833243,\r\n    \"OrderDate\": 1574669833243,\r\n    \"RequiredDate\": 1574669833243,\r\n    \"ShippedDate\": 1574669833243,\r\n    \"details\": [\r\n        {\r\n            \"UnitPrice\": 7.99,\r\n            \"Discount\": 4,\r\n            \"Quantity\": 1,\r\n            \"ProductId\": 227,\r\n            \"OrderId\": 4\r\n        },\r\n        {\r\n            \"UnitPrice\": 7.99,\r\n            \"Discount\": 8,\r\n            \"Quantity\": 1,\r\n            \"ProductId\": 308266,\r\n            \"OrderId\": 4\r\n        }\r\n    ],\r\n    \"id\": \"6ff9ca92-19cf-11ea-967a-00163e4c20f5\",\r\n    \"_rid\": \"VdgtAK23OMAFAAAAAAAAAA==\",\r\n    \"_self\": \"dbs\/VdgtAA==\/colls\/VdgtAK23OMA=\/docs\/VdgtAK23OMAFAAAAAAAAAA==\/\",\r\n    \"_etag\": \"\\\"270035d4-0000-1100-0000-5ded16b60000\\\"\",\r\n    \"_attachments\": \"attachments\/\",\r\n    \"_ts\": 1575818934\r\n}<\/pre>\n<p>You can either run the Notebook as a one-off migration, or if the data needs to be moved regularly, you can <a href=\"https:\/\/docs.microsoft.com\/en-us\/azure\/databricks\/notebooks\/notebooks-manage#--schedule-a-notebook\">schedule the Notebook in Databricks<\/a>. If the migration is part of a complex data movement pipeline, you can <a href=\"https:\/\/docs.microsoft.com\/en-us\/azure\/data-factory\/transform-data-using-databricks-notebook\">include the Notebook as part of a pipeline in Azure Data Factory.<\/a><\/p>\n<h2><span style=\"font-size: 14pt;\"><strong>Get started<\/strong><\/span><\/h2>\n<p>Create a new account using the Azure Portal, ARM template or Azure CLI and connect to it using your favourite tools.\u00a0Stay up-to-date on the latest Azure\u202f<a href=\"https:\/\/twitter.com\/search?q=%23cosmosdb\">#CosmosDB<\/a>\u202fnews and features by following us on Twitter\u202f<a href=\"https:\/\/twitter.com\/azurecosmosdb\">@AzureCosmosDB<\/a>. We are really excited to see what you will build with Azure Cosmos DB!<\/p>\n<h2><span style=\"font-size: 14pt;\">About Azure Cosmos DB<\/span><\/h2>\n<p><a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/introduction\">Azure Cosmos DB<\/a> is a globally distributed, multi-model database service that enables you to read and write data from any Azure region. It offers <a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/distribute-data-globally\">turnkey global distribution<\/a>, guarantees\u00a0<a href=\"https:\/\/azure.microsoft.com\/support\/legal\/sla\/cosmos-db\/v1_3\/\">single-digit millisecond<\/a> latency at the 99<sup>th<\/sup>\u00a0percentile, 99.999 percent\u00a0<a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/high-availability\">high availability<\/a>, with <a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/scaling-throughput\">elastic scaling<\/a>\u00a0of\u00a0<a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/request-units\">throughput and storage<\/a>.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>A worked through sample for migrating relational data with one-to-few relationships into Azure Cosmos DB SQL API.<\/p>\n","protected":false},"author":9387,"featured_media":61,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[19],"tags":[289,287,291,286,293,292,285,290],"class_list":["post-318","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-tips-and-tricks","tag-azure","tag-cosmos-db","tag-databricks","tag-migration","tag-one-to-few","tag-pyspark","tag-relational","tag-spark"],"acf":[],"blog_post_summary":"<p>A worked through sample for migrating relational data with one-to-few relationships into Azure Cosmos DB SQL API.<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/posts\/318","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/users\/9387"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/comments?post=318"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/posts\/318\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/media\/61"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/media?parent=318"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/categories?post=318"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/tags?post=318"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}