{"id":1675,"date":"2020-09-15T09:00:46","date_gmt":"2020-09-15T16:00:46","guid":{"rendered":"https:\/\/devblogs.microsoft.com\/cosmosdb\/?p=1675"},"modified":"2020-09-17T16:23:12","modified_gmt":"2020-09-17T23:23:12","slug":"bulk-updates-with-optimistic-concurrency-control","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/cosmosdb\/bulk-updates-with-optimistic-concurrency-control\/","title":{"rendered":"Bulk Updates with Optimistic Concurrency Control"},"content":{"rendered":"<p>In this blog we are going to focus on a scenario where you want to do many updates to vertices in a graph stored in <a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/introduction\">Azure Cosmos DB<\/a> using the <a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/graph-introduction\">Gremlin API<\/a>, where each update needs to account for other updates that might occur at the same time.<\/p>\n<p>A great feature about the Core (SQL) API in Azure Cosmos DB is that it is fully interoperable with the Gremlin API with respect to CRUD operations, which means you can use all of the powerful features in Core (SQL) API and it&#8217;s SDKs to manipulate vertices and edges in a graph.<\/p>\n<p>A feature that has been added to the .NET SDK in Azure Cosmos DB since version 3 is <a href=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/introducing-bulk-support-in-the-net-sdk\/\">bulk support<\/a>, which we will also explore here. The bulk support feature in the SDK is a replacement for the older bulk executor library, and now fully supports all CRUD operations (insert, update, read, delete).<\/p>\n<p>The full code sample for the below can be found <a href=\"https:\/\/github.com\/TheovanKraay\/cosmos-graph-bulk-update\/blob\/master\/BulkGraphUpdates\/Program.cs\">here<\/a>.<\/p>\n<p>&nbsp;<\/p>\n<h5>Bulk Scenario<\/h5>\n<p>Imagine you have a graph which models devices across a fleet of cars. Suppose you want to run a bulk operation to increase a temperature setting on each device by a certain amount for all devices in a particular fleet, but you are aware that independent operations could also be setting the temperature setting at the same time, and you want to make sure that this temperature state is not overwritten in your bulk update. In other words, you want to ensure all devices have their temperature setting increased by the same amount, relative to the value that was present in the database at the time of bulk update, without missing any interim updates. First, lets model the vertices in the graph as classes:<\/p>\n<p>&nbsp;<\/p>\n<pre><span style=\"font-size: 10pt;\"> \u00a0\u00a0 public class VertexDevice\r\n\u00a0\u00a0\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"id\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 internal string Id { get; set; }\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"pk\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 internal string pk { get; set; }\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"label\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 internal string label { get; set; }\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"model\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 public List&lt;VertexProperty&gt; model { get; set; }\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"temp\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 public List&lt;VertexPropertyNumber&gt; temp { get; set; }\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"status\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 public List&lt;VertexProperty&gt; status { get; set; }\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"_rid\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 internal string _rid { get; set; }\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"_self\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 internal string _self { get; set; }\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"_etag\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 internal string _etag { get; set; }\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"_attachments\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 internal string _attachments { get; set; }\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"_ts\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 internal string _ts { get; set; }\r\n\u00a0\u00a0\u00a0 }\r\n\r\n\u00a0\u00a0\u00a0 public class VertexProperty\r\n\u00a0\u00a0\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"id\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 public string Id { get; set; }\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"_value\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 public string _value { get; set; }\r\n\u00a0\u00a0\u00a0 }\r\n\r\n\r\n\u00a0\u00a0\u00a0 public class VertexPropertyNumber\r\n\u00a0\u00a0\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"id\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 public string Id { get; set; }\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 [JsonProperty(\"_value\")]\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 public int _value { get; set; }\r\n\u00a0\u00a0\u00a0 }<\/span><\/pre>\n<p>&nbsp;<\/p>\n<p>Next, lets create a basic console app, with the dependencies and instance variables you are going to use (fill in endpoint and auth key with the appropriate values from your Cosmos account):<\/p>\n<p>&nbsp;<\/p>\n<pre><span style=\"font-size: 10pt;\">namespace BulkGraphUpdates\r\n\r\n{\r\n\u00a0\u00a0\u00a0 using System;\r\n\u00a0\u00a0\u00a0 using System.Collections.Generic;\r\n\u00a0\u00a0\u00a0 using System.Threading;\r\n\u00a0\u00a0\u00a0 using System.Threading.Tasks;\r\n\u00a0\u00a0\u00a0 using Microsoft.Azure.Cosmos;\r\n\u00a0\u00a0\u00a0 using Newtonsoft.Json;\r\n\u00a0\u00a0\u00a0 class Program\r\n\u00a0\u00a0\u00a0 {\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/From Keys in portal...\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 static string endpoint = \".NET SDK URI\";\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 static string authKey = \"PRIMARY KEY\";\r\n\u00a0\u00a0\u00a0\u00a0\u00a0 \u00a0\u00a0Container container = client.GetContainer(\"graphdb\", \"graph\");\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/create cosmos client with bulk support enabled\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 static CosmosClient client = new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });\r\n\u00a0 \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0List&lt;String&gt; faileddocs = new List&lt;String&gt;();\r\n        List&lt;VertexDevice&gt; vertices;\r\n\u00a0\u00a0\u00a0 }\r\n}<\/span><\/pre>\n<p>Note that when we create the CosmosClient, we also enable the SDK&#8217;s built-in bulk execution capability:<\/p>\n<pre><span style=\"font-size: 10pt;\">new CosmosClientOptions() { AllowBulkExecution = true }<\/span><\/pre>\n<p>This allows the operations we will show later on to be executed using <a href=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/introducing-bulk-support-in-the-net-sdk\/\">bulk support<\/a>. You can either put the vertex classes we created earlier into the above as inner classes, or have them as separate files and import the classes. Now, let&#8217;s create a private method, that is going to do each individual update.<\/p>\n<pre><span style=\"font-size: 10pt;\">private async Task Update(VertexDevice device)\r\n\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0 try\r\n\u00a0\u00a0\u00a0\u00a0\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/check IfMatchEtag to ensure that no changes have occured since initial read of the document\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 await container.ReplaceItemAsync(device, device.Id, new PartitionKey(\"fleet1\"), new ItemRequestOptions { IfMatchEtag = device._etag });\r\n\u00a0\u00a0\u00a0 \u00a0\u00a0}\r\n\u00a0\u00a0\u00a0\u00a0\u00a0 catch (CosmosException cre)\r\n\u00a0\u00a0\u00a0\u00a0\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 if (cre.StatusCode.ToString().Equals(\"PreconditionFailed\"))\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 {\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 this.faileddocs.Add(device.Id);\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 }\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \u00a0\r\n\u00a0\u00a0\u00a0\u00a0\u00a0 }\r\n\u00a0 }\r\n<\/span><\/pre>\n<p>Notice here that we are doing a check on the _etag property. This will detect if the record (document) we are giving during update matches the current document stored in the database, which is how we implement <a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/database-transactions-optimistic-concurrency#optimistic-concurrency-control\">optimistic concurrency control<\/a>. If the check fails, we will add the id of the document to the faileddocs list.<\/p>\n<p>We will also create a private method to re-read the documents that fail the _etag check, and add them to the vertices list for re-processing:<\/p>\n<pre><span style=\"font-size: 10pt;\">private async Task Read(string docid)<\/span>\r\n<span style=\"font-size: 10pt;\">{<\/span>\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0 ItemResponse&lt;VertexDevice&gt; response = await container.ReadItemAsync&lt;VertexDevice&gt;(partitionKey: new PartitionKey(\"fleet1\"), id: docid);<\/span>\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0 vertices.Add(response);<\/span>\r\n<span style=\"font-size: 10pt;\">}<\/span><\/pre>\n<p>&nbsp;<\/p>\n<p>Next, let&#8217;s add a BulkUpdateGraphAsync method:<\/p>\n<pre><span style=\"font-size: 10pt;\">private async Task BulkUpdateGraphAsync()\r\n{\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\r\n\u00a0\u00a0\u00a0 \/\/if we get failed docs, it means the optimistic concurrency control test failed due to another process updating the doc at the same time, and\r\n\u00a0\u00a0\u00a0 \/\/we want to check get the temperature to ensure we are increasing all device temperatures by the same amount\r\n\u00a0\u00a0\u00a0 if (this.faileddocs.Count != 0)\r\n\u00a0\u00a0\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0this.vertices = new List&lt;VertexDevice&gt;();\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 foreach (String docid in this.faileddocs)\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/add tasks to concurrent tasks list\r\n            concurrentTasks.Add(Read(docid));  \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 }\r\n        \/\/bulk read each failed doc, adding each to vertices to be re-processed.\r\n        await Task.WhenAll(concurrentTasks);\r\n\u00a0\u00a0\u00a0 }\r\n\r\n\u00a0\u00a0\u00a0 else\r\n\u00a0\u00a0\u00a0\u00a0{\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/get vertices in 'fleet1' as we want to increase all of them by 20\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 QueryDefinition query = new QueryDefinition(\"SELECT * FROM c where c.pk = 'fleet1'\");\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 vertices = new List&lt;VertexDevice&gt;();\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 using (FeedIterator&lt;VertexDevice&gt; resultSet = container.GetItemQueryIterator&lt;VertexDevice&gt;(\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 queryDefinition: query,\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 requestOptions: new QueryRequestOptions()\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \u00a0{\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/these devices have been modelled with partition key value of \"fleet1\"\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 PartitionKey = new PartitionKey(\"fleet1\"),\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 }))\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 while (resultSet.HasMoreResults)\r\n\u00a0\u00a0\u00a0\u00a0 \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0{\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 FeedResponse&lt;VertexDevice&gt; response = await resultSet.ReadNextAsync();\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 vertices.AddRange(response);\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 }\r\n\u00a0\u00a0\u00a0 }\r\n\r\n\u00a0\u00a0\u00a0 \/\/re-set failed docs to null here to stop infinite recursion\r\n\u00a0\u00a0\u00a0 faileddocs = new List&lt;String&gt;();\r\n\r\n\u00a0\u00a0\u00a0 \/\/test optimistic concurrency by updating a doc within a 30 second window to force IfMatchEtag to fail\r\n\u00a0\u00a0\u00a0 Console.WriteLine(\"waiting 30 seconds to simulate concurrent updates before applying bulk update......\");\r\n\u00a0\u00a0\u00a0 Thread.Sleep(30000);\r\n\r\n\u00a0\u00a0\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/set up concurrentTasks for bulk upsert\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 List&lt;Task&gt; concurrentTasks = new List&lt;Task&gt;();<\/span>\r\n<span style=\"font-size: 10pt;\">\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 foreach (VertexDevice device in vertices)\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/increase temperature setting by 20\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 device.temp[0]._value = device.temp[0]._value + 20;\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/we put a safeguard here to ensure no updates that take temp above 140\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 if (device.temp[0]._value &lt; 140)\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/add tasks to concurrent tasks list\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 concurrentTasks.Add(Update(device));\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 }\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 }\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/bulk update the graph objects in fleet1, increasing temperature of all devices by 20.\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 await Task.WhenAll(concurrentTasks);\r\n\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 if (this.faileddocs.Count != 0)\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/recursive method call to re-apply change where replaceItem failed IfMatchEtag\u00a0\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 Console.WriteLine(\"Retrying docs where IfMatchEtag failed...\");\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 await BulkUpdateGraphAsync();\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 }\r\n\u00a0\u00a0\u00a0 }\r\n}\r\n\r\n<\/span><\/pre>\n<p>&nbsp;<\/p>\n<p>There are a few things happening here. Lets take each part, one by one.<\/p>\n<p>First, we check to see if there are any failed documents:<\/p>\n<pre><span style=\"font-size: 10pt;\">if (this.faileddocs.Count != 0)\r\n\r\n<\/span><\/pre>\n<p>This will only return true if any _etag test on the initial attempt to bulk update all the vertices failed. In this scenario, we are going to add all failed docs to the list of vertices to be re-processed. Notice that when we re-read the failed docs to get the latest state, we make use of bulk support:<\/p>\n<pre><span style=\"font-size: 10pt;\">List&lt;Task&gt; concurrentTasks = new List&lt;Task&gt;();<\/span>\r\n<span style=\"font-size: 10pt;\">this.vertices = new List&lt;VertexDevice&gt;();<\/span>\r\n<span style=\"font-size: 10pt;\">foreach (String docid in this.faileddocs)<\/span>\r\n<span style=\"font-size: 10pt;\">{<\/span>\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0 \/\/add tasks to concurrent tasks list<\/span>\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0 concurrentTasks.Add(Read(docid));\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span>\r\n<span style=\"font-size: 10pt;\">}<\/span>\r\n<span style=\"font-size: 10pt;\">\/\/bulk read each failed doc, adding each to vertices to be re-processed.<\/span>\r\n<span style=\"font-size: 10pt;\">await Task.WhenAll(concurrentTasks);\r\n\r\n<\/span><\/pre>\n<p>&nbsp;<\/p>\n<p>Next, if there are no failed docs, then this is the first attempt at running the bulk update, so we will attempt to get all vertices for \u201cfield 1\u201d (vertices have been <a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/partitioning-overview\">partitioned<\/a> using attribute &#8220;pk&#8221; which contains this value), and increase the temperature for each by 20. Note that the bulk update approach is exactly the same as bulk read above:<\/p>\n<pre><span style=\"font-size: 10pt;\">\/\/set up concurrentTasks for bulk upsert<\/span>\r\n<span style=\"font-size: 10pt;\">List&lt;Task&gt; concurrentTasks = new List&lt;Task&gt;();<\/span>\r\n<span style=\"font-size: 10pt;\">foreach (VertexDevice device in vertices)<\/span>\r\n<span style=\"font-size: 10pt;\">{<\/span>\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0 \/\/increase temperature setting by 20<\/span>\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0 device.temp[0]._value = device.temp[0]._value + 20;<\/span>\r\n\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0 \/\/we put a safeguard here to ensure no updates that take temp above 140<\/span>\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0 if (device.temp[0]._value &lt; 140)<\/span>\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0 {<\/span>\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/add tasks to concurrent tasks list<\/span>\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 concurrentTasks.Add(Update(device));<\/span>\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0 }<\/span>\r\n<span style=\"font-size: 10pt;\">}\r\n\/\/bulk update the graph objects in fleet1, increasing temperature of all devices by 20. \r\nawait Task.WhenAll(concurrentTasks);\r\n<\/span><\/pre>\n<p>&nbsp;<\/p>\n<p>Note also the recursive call to the method to ensure that we keep retrying the update until there are no _etag failures:<\/p>\n<pre><span style=\"font-size: 10pt;\">if (this.faileddocs.Count != 0)<\/span>\r\n<span style=\"font-size: 10pt;\">{<\/span>\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0 \/\/recursive method call to re-apply change where replaceItem failed IfMatchEtag\u00a0<\/span>\r\n<span style=\"font-size: 10pt;\"> \u00a0\u00a0 Console.WriteLine(\"Retrying docs where IfMatchEtag failed...\");<\/span>\r\n<span style=\"font-size: 10pt;\">  \u00a0 await BulkUpdateGraphAsync();<\/span>\r\n<span style=\"font-size: 10pt;\">}\r\n\r\n<\/span><\/pre>\n<p>Finally, note that there is an artificial wait added so that you can force the _etag test to fail by updating a record during the bulk update (you would obviously remove this in a production implementation, this is only for testing and demo purposes):<\/p>\n<pre><span style=\"font-size: 10pt;\"> \/\/test optimistic concurrency by updating a doc within a 30 second window to force IfMatchEtag to fail\r\n\u00a0Console.WriteLine(\"waiting 30 seconds to simulate concurrent updates before applying bulk update......\");\r\n\u00a0Thread.Sleep(30000);\r\n\r\n\r\n<\/span><\/pre>\n<p>Now we can add a main method to test this out:<\/p>\n<pre><span style=\"font-size: 10pt;\">static async Task Main(string[] args)\r\n{\r\n\u00a0\u00a0\u00a0 try\r\n\u00a0\u00a0\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 \/\/We are going to update all the devices in partition \"fleet1\", to increase temperature by 20\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 Console.WriteLine(\"Bulk updating nodes in graph...\");\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 await new Program().BulkUpdateGraphAsync();\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 Console.WriteLine(\"Update done!\");\r\n\u00a0\u00a0\u00a0 }\r\n\u00a0\u00a0\u00a0 catch (Exception e)\r\n\u00a0\u00a0\u00a0 {\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0 Console.WriteLine(\"Exception: \" + e);\r\n\u00a0\u00a0\u00a0 }\r\n}<\/span><\/pre>\n<p>&nbsp;<\/p>\n<p>As long as you have modelled your graph to contain attributes as per the above classes, you can now run this sample and test the concurrency control check by doing an update to the &#8220;temp&#8221; value of any of the vertices in your graph, during the 30 second window.<\/p>\n<p><img decoding=\"async\" class=\"alignnone wp-image-1714 size-full\" src=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/09\/editgraph.png\" alt=\"Image editgraph\" width=\"2209\" height=\"889\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/09\/editgraph.png 2209w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/09\/editgraph-300x121.png 300w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/09\/editgraph-1024x412.png 1024w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/09\/editgraph-768x309.png 768w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/09\/editgraph-1536x618.png 1536w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/09\/editgraph-2048x824.png 2048w\" sizes=\"(max-width: 2209px) 100vw, 2209px\" \/><\/p>\n<p><img decoding=\"async\" class=\"alignnone wp-image-1676 size-full\" src=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/09\/bulkUpdates.png\" alt=\"Image bulkUpdates\" width=\"1305\" height=\"197\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/09\/bulkUpdates.png 1305w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/09\/bulkUpdates-300x45.png 300w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/09\/bulkUpdates-1024x155.png 1024w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2020\/09\/bulkUpdates-768x116.png 768w\" sizes=\"(max-width: 1305px) 100vw, 1305px\" \/><\/p>\n<p>In this blog we have considered bulk updates, with optimistic concurrency control, to vertices in a graph stored in Gremlin API. Of course, the principles here apply to Core (SQL) API as well.<\/p>\n<h5><b><span data-contrast=\"none\">To l<\/span><\/b><b><span data-contrast=\"none\">earn more:<\/span><\/b><\/h5>\n<p><span data-contrast=\"none\">Check out<\/span><span data-contrast=\"none\">\u00a0some of these resources from the official documentation:<\/span><span data-ccp-props=\"{}\">\u00a0<\/span><\/p>\n<ul>\n<li data-leveltext=\"\uf0b7\" data-font=\"Symbol\" data-listid=\"1\" aria-setsize=\"-1\" data-aria-posinset=\"1\" data-aria-level=\"1\"><a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/tutorial-sql-api-dotnet-bulk-import\">Using .NET Bulk Support<\/a><\/li>\n<li data-leveltext=\"\uf0b7\" data-font=\"Symbol\" data-listid=\"1\" aria-setsize=\"-1\" data-aria-posinset=\"2\" data-aria-level=\"1\"><a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/how-to-migrate-from-bulk-executor-library\"><span data-contrast=\"none\">Migrate from bulk executor to bulk support in .NET V3 SDK<\/span><\/a><\/li>\n<li data-leveltext=\"\uf0b7\" data-font=\"Symbol\" data-listid=\"1\" aria-setsize=\"-1\" data-aria-posinset=\"2\" data-aria-level=\"1\"><a href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/database-transactions-optimistic-concurrency\">Transactions and Optimistic Concurrency Control<\/a><\/li>\n<\/ul>\n","protected":false},"excerpt":{"rendered":"<p>How to support bulk updates in Azure Cosmos DB with Optimistic concurrency control. In this sample, we use Core (SQL) API to bulk update vertices in a graph hosted in Gremlin API!<\/p>\n","protected":false},"author":9387,"featured_media":1716,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[17,14,19],"tags":[1745,1751,1750,1749,1746,1744,1748,1743,1747],"class_list":["post-1675","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-gremlin-api","category-core-sql-api","category-tips-and-tricks","tag-net-sdk","tag-bulk-delete","tag-bulk-insert","tag-bulk-reads","tag-bulk-support","tag-bulk-updates","tag-etag","tag-gremlin","tag-optimistic-concurrency"],"acf":[],"blog_post_summary":"<p>How to support bulk updates in Azure Cosmos DB with Optimistic concurrency control. In this sample, we use Core (SQL) API to bulk update vertices in a graph hosted in Gremlin API!<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/posts\/1675","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/users\/9387"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/comments?post=1675"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/posts\/1675\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/media\/1716"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/media?parent=1675"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/categories?post=1675"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/tags?post=1675"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}