January 18th, 2024

Latest NoSQL Java Ecosystem Updates 2023 Q3 & Q4

Theo van Kraay
Principal Program Manager

We’re always busy adding new features, fixes, patches, and improvements to our Java-based client libraries for Azure Cosmos DB for NoSQL. In this regular blog series, we share highlights of recent updates in the last period.

 

July – December 2023 updates

 

  1. Spark 3.4 Support
  2. Throughput Control – gateway support in Spark Connector
  3. Aggressive Connection Warmup Improvements in Java SDK
  4. Query Pagination Improvements in Java SDK
  5. Patch Operation on more than 10 fields in Spark Connector
  6. Bypass Integrated Cache in Java SDK
  7. Diagnostics Thresholds support for Java SDK and Spring Data
  8. Integration of Throughput Control with Change Feed Processor – Java SDK
  9. Session token mismatch  – further optimization
  10. Change Feed Processor Context for All Versions and Deletes mode in Java SDK
  11. Hierarchical Partition Key Support in Spark Connector

 

Spark 3.4 Support

Cloud-native hybrid transactional and analytical processing (HTAP) is supported in Azure Cosmos DB through Synapse link, using OLTP and OLAP Spark connectors, which now support Spark 3.4 as of July 2023. This includes support for TimestampNTZType introduced in Spark 3.4.

 

Throughput Control – Gateway Mode Support in Spark Connector

In the Cosmos DB Spark Connector, throughput control is a feature which helps to isolate the performance needs of applications running against a container, by limiting the amount of request units that can be consumed by a given Spark client. In September 2023 we added support for gateway mode, users can now enable this by setting spark.cosmos.useGatewayMode to true in Spark config. Find out more about throughput control here.

 

Aggressive Connection Warmup Improvements in Java SDK

Earlier in February 2023 we introduced Proactive Connection Management, a feature which allows developers to warm up connections and caches for containers in both the current read region and a pre-defined number of preferred remote regions. This feature can improve tail latency in cross-region failover scenarios. In September we have made enhancements and improved the efficiency of the way connections are opened during the warm up phase.

 

Query Pagination Improvements in Java SDK

We’ve made enhancements to pagination in September 2023.

 

Patch Operation on more than 10 fields in Spark Connector

For Patch API in Cosmos DB, there is a general limitation of only patching documents up to 10 fields at a time (see Partial document update – Azure Cosmos DB for NoSQL for more information). In the Spark OLTP connector we have developed a new feature to allow customers to patch their documents having more than 10 columns. This resolves a big restriction on most of the customers who use spark for bulk ingestion and updating their documents. By setting spark.cosmos.write.strategy to  ItemBulkUpdate, users can now patch more than 10 items in a single operation. To find samples for doing patch using Cosmos DB Spark Connector, see here.

 

Bypass Integrated Cache in Java SDK

The Azure Cosmos DB integrated cache is an in-memory cache that helps ensure manageable costs and low latency as request volume grows. The integrated cache is a read-through, write-through cache with a Least Recently Used (LRU) eviction policy. However, there are some scenarios where it is preferable to avoid using the cache on a per request basis:

DedicatedGatewayRequestOptions dedicatedOptions = new DedicatedGatewayRequestOptions(); 
dedicatedOptions.setMaxIntegratedCacheStaleness(Duration.ofMinutes(2)); 
dedicatedOptions.setIntegratedCacheBypassed(true); 
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions(); 
queryOptions.setDedicatedGatewayRequestOptions(dedicatedOptions);
Diagnostics Thresholds support for Java SDK and Spring Data

Back in September 2022 we introduced the option to emit client metrics from the Azure Cosmos DB Java SDK via micrometer MeterRegistry as well as doing so from the Spark connector via configuration. We have since added the ability to define thresholds, which for very noisy applications will help to limit metrics consumed to the ones you are most interested in. Check out documentation on instrumenting client metrics with Micrometer using Prometheus, which includes examples on how to define thresholds. We also recently enabled threshold support in the Cosmos Spring Data Client Library, just configure your CosmosClientBuilder bean as below:

@Bean
public CosmosClientBuilder cosmosClientBuilder() {
    DirectConnectionConfig directConnectionConfig = DirectConnectionConfig.getDefaultConfig();
    return new CosmosClientBuilder()
            .endpoint(properties.getUri())
            .key(properties.getKey())
            .directMode(directConnectionConfig)
            .clientTelemetryConfig(
                    new CosmosClientTelemetryConfig()
                            .diagnosticsThresholds(
                                    new CosmosDiagnosticsThresholds()
                                                .setNonPointOperationLatencyThreshold(Duration.ofMillis(nonPointOperationLatencyThresholdInMS))
                                                .setPointOperationLatencyThreshold(Duration.ofMillis(pointOperationLatencyThresholdInMS))
                                                .setPayloadSizeThreshold(payloadSizeThresholdInBytes)
                                                .setRequestChargeThreshold(requestChargeThresholdInRU)
                                )
                                .diagnosticsHandler(CosmosDiagnosticsHandler.DEFAULT_LOGGING_HANDLER));
    }
Integration of Throughput Control with Change Feed Processor – Java SDK

During a heavy backlog of changes in the monitoring container, the Change Feed Processor (CFP) will keep polling documents in order to catch up. This can cause an increase in request unit usage, and in turn cause heavy throttling. To avoid this, we have integrated Throughput Control with CFP. In Java SDK, Priority Based Execution is already integrated into Throughput Control, making it easier for customers to set low priority for CFP based workload to avoid the impact of throttling on other workloads running in parallel. Here’s a sample of how to define both throughput control and a priority level when processing change feed:

ThroughputControlGroupConfig throughputControlGroupConfig =
        new ThroughputControlGroupConfigBuilder()
                .groupName("changeFeedProcessor")
                .targetThroughput(1000)
                .priorityLevel(PriorityLevel.LOW)
                .build();
options.setFeedPollThroughputControlConfig(throughputControlGroupConfig);

ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .options(options)
        .handleChanges(handleChanges())
        .buildChangeFeedProcessor();
changeFeedProcessorInstance.start()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();

Session token mismatch  – further optimization

In the previous period we introduced a session token mismatch optimisation which allows application developers to configure hints through a SessionRetryOptions instance. This signals to the SDK whether to pin retries on the local region or move quicker to a remote region, especially when READ_SESSION_NOT_AVAILABLE errors are thrown. The latest change adds a minimum time spent in trying to see whether the local time can meet session consistency before attempting to call the remote region. This will add some time the local region can use to catch-up on replication lag.

int minMaxRetriesInLocalRegion= 5;
System.setProperty("COSMOS.MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED", String.valueOf(minMaxRetriesInLocalRegion));
Change Feed Processor Context for All Versions and Deletes mode in Java SDK

The Change Feed Processor (CFP) has a special mode for All Versions and Deletes, giving the user a record of each change to items in the order that it occurred, including intermediate changes to an item between change feed reads, as well as a record of all deletes and the prior image before deletion. This change in the Java SDK adds ChangeFeedProcessorContext.This exposes information on details related to a batch of changes.  

public static ChangeFeedProcessor getChangeFeedProcessorForAllVersionsAndDeletesMode(String hostName, CosmosAsyncContainer feedContainer, CosmosAsyncContainer leaseContainer) {
    return new ChangeFeedProcessorBuilder()
            .hostName(hostName)
            .options(options)
            .feedContainer(feedContainer)
            .leaseContainer(leaseContainer)
            .handleAllVersionsAndDeletesChanges((docs, context) -> {
                for (ChangeFeedProcessorItem item : docs) {
                    String leaseToken = context.getLeaseToken();
                    // Handling of the lease token corresponding to a batch of change feed processor item goes here
                }
            })
            .buildChangeFeedProcessor();
}
Hierarchical Partition Key Support in Spark Connector

Users can now use the Spark Connector to create containers with hierarchical partition keys in Azure Cosmos DB. In this PySpark sample we create a new container with hierarchical partition keys, ingest some data, then query using the first two levels in the hierarchy:

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create an Azure Cosmos DB container with hierarchical partitioning using catalog api
cosmosDatabaseName = "Database"
cosmosHierarchicalContainerName = "HierarchicalPartitionKeyContainer"
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/tenantId,/userId,/sessionId', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosHierarchicalContainerName))

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosHierarchicalContainerName,
  "spark.cosmos.read.partitioning.strategy" : "Restrictive"
}

#ingest some data
spark.createDataFrame((("id1", "tenant 1", "User 1", "session 1"), ("id2", "tenant 1", "User 1", "session 1"), ("id3", "tenant 2", "User 1", "session 1"))) \
  .toDF("id","tenantId","userId","sessionId") \
   .write \
   .format("cosmos.oltp") \
   .options(**cfg) \
   .mode("APPEND") \
   .save()

#query by filtering the first two levels in the hierarchy without feedRangeFilter - this is less efficient as it will go through all physical partitions
query_df = spark.read.format("cosmos.oltp").options(**cfg) \
.option("spark.cosmos.read.customQuery" , "SELECT * from c where c.tenantId = 'tenant 1' and c.userId = 'User 1'").load()
query_df.show()

# prepare feed range to filter on first two levels in the hierarchy
spark.udf.registerJavaFunction("GetFeedRangeForPartitionKey", "com.azure.cosmos.spark.udf.GetFeedRangeForHierarchicalPartitionKeyValues", StringType())
pkDefinition = "{\"paths\":[\"/tenantId\",\"/userId\",\"/sessionId\"],\"kind\":\"MultiHash\"}"
pkValues = "[\"tenant 1\", \"User 1\"]"
feedRangeDf = spark.sql(f"SELECT GetFeedRangeForPartitionKey('{pkDefinition}', '{pkValues}')")
feedRange = feedRangeDf.collect()[0][0]

# query by filtering the first two levels in the hierarchy using feedRangeFilter (will target the physical partition in which all sub-partitions are co-located)
query_df = spark.read.format("cosmos.oltp").options(**cfg).option("spark.cosmos.partitioning.feedRangeFilter",feedRange).load()
query_df.show()
Fixes, patches, and enhancements

In addition to all of the above features, we have of course made a large number of smaller bug fixes, security patches, enhancements, and improvements. You can track all the changes for each client library, along with the minimum version we recommend you use, by viewing the change logs:

 

Get Started with Java in Azure Cosmos DB

About Azure Cosmos DB

Azure Cosmos DB is a fully managed and serverless distributed database for modern app development, with SLA-backed speed and availability, automatic and instant scalability, and support for open source PostgreSQL, MongoDB and Apache Cassandra. Try Azure Cosmos DB for free here. To stay in the loop on Azure Cosmos DB updates, follow us on Twitter, YouTube, and LinkedIn.

To easily build your first database, watch our Get Started videos on YouTube and explore ways to dev/test free.

Author

Theo van Kraay
Principal Program Manager

Principal Program Manager on the Azure Cosmos DB engineering team. Focused on Apache Cassandra offerings, Java ecosystem, high availability, and customer success.

0 comments

Discussion are closed.