Bulk Updates with Optimistic Concurrency Control

Avatar

Theo

In this blog we are going to focus on a scenario where you want to do many updates to vertices in a graph stored in Azure Cosmos DB using the Gremlin API, where each update needs to account for other updates that might occur at the same time.

A great feature about the Core (SQL) API in Azure Cosmos DB is that it is fully interoperable with the Gremlin API with respect to CRUD operations, which means you can use all of the powerful features in Core (SQL) API and it’s SDKs to manipulate vertices and edges in a graph.

A feature that has been added to the .NET SDK in Azure Cosmos DB since version 3 is bulk support, which we will also explore here. The bulk support feature in the SDK is a replacement for the older bulk executor library, and now fully supports all CRUD operations (insert, update, read, delete).

The full code sample for the below can be found here.

 

Bulk Scenario

Imagine you have a graph which models devices across a fleet of cars. Suppose you want to run a bulk operation to increase a temperature setting on each device by a certain amount for all devices in a particular fleet, but you are aware that independent operations could also be setting the temperature setting at the same time, and you want to make sure that this temperature state is not overwritten in your bulk update. In other words, you want to ensure all devices have their temperature setting increased by the same amount, relative to the value that was present in the database at the time of bulk update, without missing any interim updates. First, lets model the vertices in the graph as classes:

 

    public class VertexDevice
    {
        [JsonProperty("id")]
        internal string Id { get; set; }

        [JsonProperty("pk")]
        internal string pk { get; set; }

        [JsonProperty("label")]
        internal string label { get; set; }

        [JsonProperty("model")]
        public List<VertexProperty> model { get; set; }

        [JsonProperty("temp")]
        public List<VertexPropertyNumber> temp { get; set; }

        [JsonProperty("status")]
        public List<VertexProperty> status { get; set; }

        [JsonProperty("_rid")]
        internal string _rid { get; set; }

        [JsonProperty("_self")]
        internal string _self { get; set; }

        [JsonProperty("_etag")]
        internal string _etag { get; set; }

        [JsonProperty("_attachments")]
        internal string _attachments { get; set; }

        [JsonProperty("_ts")]
        internal string _ts { get; set; }
    }

    public class VertexProperty
    {
        [JsonProperty("id")]
        public string Id { get; set; }

        [JsonProperty("_value")]
        public string _value { get; set; }
    }


    public class VertexPropertyNumber
    {
        [JsonProperty("id")]
        public string Id { get; set; }

        [JsonProperty("_value")]
        public int _value { get; set; }
    }

 

Next, lets create a basic console app, with the dependencies and instance variables you are going to use (fill in endpoint and auth key with the appropriate values from your Cosmos account):

 

namespace BulkGraphUpdates

{
    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.Cosmos;
    using Newtonsoft.Json;
    class Program
    {

        //From Keys in portal...
        static string endpoint = ".NET SDK URI";
        static string authKey = "PRIMARY KEY";
        Container container = client.GetContainer("graphdb", "graph");

        //create cosmos client with bulk support enabled
        static CosmosClient client = new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
        List<String> faileddocs = new List<String>();
        List<VertexDevice> vertices;
    }
}

Note that when we create the CosmosClient, we also enable the SDK’s built-in bulk execution capability:

new CosmosClientOptions() { AllowBulkExecution = true }

This allows the operations we will show later on to be executed using bulk support. You can either put the vertex classes we created earlier into the above as inner classes, or have them as separate files and import the classes. Now, let’s create a private method, that is going to do each individual update.

private async Task Update(VertexDevice device)
  {
      try
      {
          //check IfMatchEtag to ensure that no changes have occured since initial read of the document
          await container.ReplaceItemAsync(device, device.Id, new PartitionKey("fleet1"), new ItemRequestOptions { IfMatchEtag = device._etag });
      }
      catch (CosmosException cre)
      {
          if (cre.StatusCode.ToString().Equals("PreconditionFailed"))
          {                   
              this.faileddocs.Add(device.Id);
          }                
      }
  }

Notice here that we are doing a check on the _etag property. This will detect if the record (document) we are giving during update matches the current document stored in the database, which is how we implement optimistic concurrency control. If the check fails, we will add the id of the document to the faileddocs list.

We will also create a private method to re-read the documents that fail the _etag check, and add them to the vertices list for re-processing:

private async Task Read(string docid)
{
    ItemResponse<VertexDevice> response = await container.ReadItemAsync<VertexDevice>(partitionKey: new PartitionKey("fleet1"), id: docid);
    vertices.Add(response);
}

 

Next, let’s add a BulkUpdateGraphAsync method:

private async Task BulkUpdateGraphAsync()
{           
    //if we get failed docs, it means the optimistic concurrency control test failed due to another process updating the doc at the same time, and
    //we want to check get the temperature to ensure we are increasing all device temperatures by the same amount
    if (this.faileddocs.Count != 0)
    {
        this.vertices = new List<VertexDevice>();
        foreach (String docid in this.faileddocs)
        {
            //add tasks to concurrent tasks list
            concurrentTasks.Add(Read(docid));               
        }
        //bulk read each failed doc, adding each to vertices to be re-processed.
        await Task.WhenAll(concurrentTasks);
    }

    else
    {
        //get vertices in 'fleet1' as we want to increase all of them by 20
        QueryDefinition query = new QueryDefinition("SELECT * FROM c where c.pk = 'fleet1'");

        vertices = new List<VertexDevice>();
        using (FeedIterator<VertexDevice> resultSet = container.GetItemQueryIterator<VertexDevice>(
            queryDefinition: query,
            requestOptions: new QueryRequestOptions()
            {
                //these devices have been modelled with partition key value of "fleet1"
                PartitionKey = new PartitionKey("fleet1"),
            }))

            while (resultSet.HasMoreResults)
            {
                FeedResponse<VertexDevice> response = await resultSet.ReadNextAsync();
                vertices.AddRange(response);
            }
    }

    //re-set failed docs to null here to stop infinite recursion
    faileddocs = new List<String>();

    //test optimistic concurrency by updating a doc within a 30 second window to force IfMatchEtag to fail
    Console.WriteLine("waiting 30 seconds to simulate concurrent updates before applying bulk update......");
    Thread.Sleep(30000);

    {
        //set up concurrentTasks for bulk upsert
        List<Task> concurrentTasks = new List<Task>();

        foreach (VertexDevice device in vertices)
        {
            //increase temperature setting by 20
            device.temp[0]._value = device.temp[0]._value + 20;

            //we put a safeguard here to ensure no updates that take temp above 140
            if (device.temp[0]._value < 140)
            {
                //add tasks to concurrent tasks list
                concurrentTasks.Add(Update(device));
            }
        }

        //bulk update the graph objects in fleet1, increasing temperature of all devices by 20.
        await Task.WhenAll(concurrentTasks);

        if (this.faileddocs.Count != 0)
        {
            //recursive method call to re-apply change where replaceItem failed IfMatchEtag 
            Console.WriteLine("Retrying docs where IfMatchEtag failed...");
            await BulkUpdateGraphAsync();
        }
    }
}

 

There are a few things happening here. Lets take each part, one by one.

First, we check to see if there are any failed documents:

if (this.faileddocs.Count != 0)

This will only return true if any _etag test on the initial attempt to bulk update all the vertices failed. In this scenario, we are going to add all failed docs to the list of vertices to be re-processed. Notice that when we re-read the failed docs to get the latest state, we make use of bulk support:

List<Task> concurrentTasks = new List<Task>();
this.vertices = new List<VertexDevice>();
foreach (String docid in this.faileddocs)
{
    //add tasks to concurrent tasks list
    concurrentTasks.Add(Read(docid));              
}
//bulk read each failed doc, adding each to vertices to be re-processed.
await Task.WhenAll(concurrentTasks);

 

Next, if there are no failed docs, then this is the first attempt at running the bulk update, so we will attempt to get all vertices for “field 1” (vertices have been partitioned using attribute “pk” which contains this value), and increase the temperature for each by 20. Note that the bulk update approach is exactly the same as bulk read above:

//set up concurrentTasks for bulk upsert
List<Task> concurrentTasks = new List<Task>();
foreach (VertexDevice device in vertices)
{
    //increase temperature setting by 20
    device.temp[0]._value = device.temp[0]._value + 20;

    //we put a safeguard here to ensure no updates that take temp above 140
    if (device.temp[0]._value < 140)
    {
        //add tasks to concurrent tasks list
        concurrentTasks.Add(Update(device));
    }
}
//bulk update the graph objects in fleet1, increasing temperature of all devices by 20. 
await Task.WhenAll(concurrentTasks);

 

Note also the recursive call to the method to ensure that we keep retrying the update until there are no _etag failures:

if (this.faileddocs.Count != 0)
{
    //recursive method call to re-apply change where replaceItem failed IfMatchEtag 
    Console.WriteLine("Retrying docs where IfMatchEtag failed...");
    await BulkUpdateGraphAsync();
}

Finally, note that there is an artificial wait added so that you can force the _etag test to fail by updating a record during the bulk update (you would obviously remove this in a production implementation, this is only for testing and demo purposes):

 //test optimistic concurrency by updating a doc within a 30 second window to force IfMatchEtag to fail
 Console.WriteLine("waiting 30 seconds to simulate concurrent updates before applying bulk update......");
 Thread.Sleep(30000);


Now we can add a main method to test this out:

static async Task Main(string[] args)
{
    try
    {
        //We are going to update all the devices in partition "fleet1", to increase temperature by 20
        Console.WriteLine("Bulk updating nodes in graph...");
        await new Program().BulkUpdateGraphAsync();
        Console.WriteLine("Update done!");
    }
    catch (Exception e)
    {
        Console.WriteLine("Exception: " + e);
    }
}

 

As long as you have modelled your graph to contain attributes as per the above classes, you can now run this sample and test the concurrency control check by doing an update to the “temp” value of any of the vertices in your graph, during the 30 second window.

Image editgraph

Image bulkUpdates

In this blog we have considered bulk updates, with optimistic concurrency control, to vertices in a graph stored in Gremlin API. Of course, the principles here apply to Core (SQL) API as well.

To learn more:

Check out some of these resources from the official documentation: 

3 comments

Leave a comment

  • Andrey Lopatin
    Andrey Lopatin

    Hello and thank you for your article, I appreciate it.
    However, I am confused a little bit – which library should we use to work with CosmosDb Gremlin? Azure SDK or Gremlin.Net? I saw a few GH discussions where it was recommended to go with Gremlin.Net. Even “Get started” examples are based on Gremlin.Net lib

    Thank you

    • Avatar
      Theo van KraayMicrosoft employee

      Hi Andrey! You can use either! For Gremlin specific functions, i.e. traversals and other graph related tasks etc, you should of course use Gremlin. However, in scenarios where you want to do pure CRUD (create, read, update, delete) operations against either vertex or edge, you have the option of using the SQL API as well. This opens up the possibility of modelling vertices using a document structure, with hierarchy, and even geo-spatial co-ordinates, which you can then query using SQL API (not covered in this blog). Or, in this case, using Cosmos DB’s native bulk support to do a large number of updates against objects in the graph, for which the gremlin driver will work less efficiently.