Migrating Relational Data with one-to-few relationships into Azure Cosmos DB SQL API

Avatar

Theo

Migrating relational data into Azure Cosmos DB SQL API requires certain modelling considerations that differ from relational databases. We discuss the important SQI API modelling concepts in our guidance on Data modelling in Azure Cosmos DB.

What follows is a sample for migrating data where one-to-few relationships exist (see when to embed data in the above guidance). The sample uses the Azure Cosmos DB Spark Connector. For more guidance on other migration options, please see Options to migrate data into Cosmos DB.

 

Order and Order Details

Here we are considering a simple order system where each order can have multiple detail lines. In this scenario, the relationship is not unbounded, and there is a limited number of detail lines that may exist for a given order. We can consider this a one-to-few relationship. This is a good candidate for denormalizion. Typically denormalized data models provide better read performance in distributed databases, since we will minimise the need to read across data partitions.

 

 

To migrate and merge order details into denormalized documents for each order, we will use Azure Databricks, an Azure managed service for Apache Spark. To configure the Cosmos DB Spark Connector in Databricks, you can follow the guidance in the Databricks documentation for Azure Cosmos DB. When this is done, you are ready to create a new Python Notebook. Start by configuring the source and target database connections in the first cell:

 

import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType

#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "OrdersDB"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"

connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)

#Connect details for Target Azure Cosmos DB SQL API account
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "OrdersDB",
    "Collection": "Orders",
    "Upsert": "true"
}

 

Next, add a new cell. Here, we will query the source Database (in this case SQL Server) for both the order and order detail records, putting the results into Spark Dataframes. We will also create a list containing all the order ids, and a Threadpool for parallel operations:

 

import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool

#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)

#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)

#get all OrderId values to pass to map function 
orderids = orders.select('OrderId').collect()

#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

 

Below this, create a function for writing Orders into the target SQL API collection. This function will filter all order details for the given order id, convert them into a JSON array, and insert the array into a JSON document that we will write into the target SQL API Collection for that order:

 

def writeOrder(orderid):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == orderid[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == orderid[0])
  
  #convert dataframe to pandas
  orderpandas = order.toPandas()
  
  #convert the order dataframe to json and remove enclosing brackets
  orderjson = orderpandas.to_json(orient='records', force_ascii=False)
  orderjson = orderjson[1:-1] 
  
  #convert orderjson to a dictionaory so we can set the details element with order details later
  orderjsondata = json.loads(orderjson)
  
  
  #convert orderdetailsgroup dataframe to json, but only if details were returned from the earlier filter
  if (orderdetailsgroup.count() !=0):
    #convert orderdetailsgroup to pandas dataframe to work better with json
    orderdetailsgroup = orderdetailsgroup.toPandas()
    
    #convert orderdetailsgroup to json string
    jsonstring = orderdetailsgroup.to_json(orient='records', force_ascii=False)
    
    #convert jsonstring to dictionary to ensure correct encoding and no corrupt records
    jsonstring = json.loads(jsonstring)
    
    #set details json element in orderjsondata to jsonstring which contains orderdetailsgroup - this merges order details into the order 
    orderjsondata['details'] = jsonstring
 
  #convert dictionary to json
  orderjsondata = json.dumps(orderjsondata)

  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjsondata]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to cosmos db using spark the connector
  #https://docs.microsoft.com/en-us/azure/cosmos-db/spark-connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

 

Finally, we will call the above using a map function on the thread pool, to execute in parallel, passing in the list of order ids we created earlier:

 

#map order details to orders in parallel using the above function
pool.map(writeOrder, orderids)

 

You should end up with records like the below for each order written to Cosmos DB, containing a JSON array of order details:

 

{
    "OrderId": 4,
    "CustomerID": 123,
    "EmployeeID": 1574669833243,
    "OrderDate": 1574669833243,
    "RequiredDate": 1574669833243,
    "ShippedDate": 1574669833243,
    "details": [
        {
            "UnitPrice": 7.99,
            "Discount": 4,
            "Quantity": 1,
            "ProductId": 227,
            "OrderId": 4
        },
        {
            "UnitPrice": 7.99,
            "Discount": 8,
            "Quantity": 1,
            "ProductId": 308266,
            "OrderId": 4
        }
    ],
    "id": "6ff9ca92-19cf-11ea-967a-00163e4c20f5",
    "_rid": "VdgtAK23OMAFAAAAAAAAAA==",
    "_self": "dbs/VdgtAA==/colls/VdgtAK23OMA=/docs/VdgtAK23OMAFAAAAAAAAAA==/",
    "_etag": "\"270035d4-0000-1100-0000-5ded16b60000\"",
    "_attachments": "attachments/",
    "_ts": 1575818934
}

You can either run the Notebook as a one-off migration, or if the data needs to be moved regularly, you can schedule the Notebook in Databricks. If the migration is part of a complex data movement pipeline, you can include the Notebook as part of a pipeline in Azure Data Factory.

Get started

Create a new account using the Azure Portal, ARM template or Azure CLI and connect to it using your favourite tools. Stay up-to-date on the latest Azure #CosmosDB news and features by following us on Twitter @AzureCosmosDB. We are really excited to see what you will build with Azure Cosmos DB!

About Azure Cosmos DB

Azure Cosmos DB is a globally distributed, multi-model database service that enables you to read and write data from any Azure region. It offers turnkey global distribution, guarantees single-digit millisecond latency at the 99th percentile, 99.999 percent high availability, with elastic scaling of throughput and storage.

0 comments

Comments are closed. Login to edit/delete your existing comments