We’re always busy adding new features, fixes, patches, and improvements to our Java-based client libraries for Azure Cosmos DB for NoSQL. In this regular blog series, we share highlights of recent updates in the last period.
January – June 2024 updates
- Java SDK Updates
- Integrate ThroughputControl with ChangeFeedProcessor
- Added Payload Size Metrics for Gateway Mode
- Overloads for
readMany
with Request Options - Public APIs for MicroBatch Size in CosmosBulkExecutionOptions
- Custom Item Serializer
- Vector Embedding Policy and Indexes for Vector Search
- Support for Non-streaming OrderBy
- Regional Scope for Session Tokens
- Query Statement in Diagnostics and Tracing
- Spring Connector Updates
- Spark Connector Updates
- Configurable Connection Pool Size
- Optimization for
readMany
Queries - Custom Schema Support for Nested Properties
- Reuse CosmosAsyncClient for Spark Applications
- Spark 3.5 Support
- Native Netty Transport
- ManagedIdentity Authentication in Databricks
- Support for Access Tokens via AccountDataResolver
- SPN Authentication with Certificates
- Kafka Connector Updates
- Fixes, patches, enhancements
Java SDK Updates
Integrate ThroughputControl with ChangeFeedProcessor
Date: January 3, 2024 PR: #38052
This feature introduces local ThroughputControl for ChangeFeedProcessor (CFP), which provides an easy way for customers to start using throughput control without any extra resource provisioning. By integrating throughput control with CFP, users can set a low priority for the change feed processor workload, ensuring that it does not impact other workloads running in parallel. This prevents heavy throttling during times of high request unit (RU) usage.
Example Usage:
ThroughputControlGroupConfig throughputControlGroupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("cfp")
.targetThroughput(300)
.priorityLevel(PriorityLevel.LOW)
.build();
ChangeFeedProcessor changeFeedProcessor =
new ChangeFeedProcessorBuilder()
.hostName("test")
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.handleChanges(docs -> {
System.out.println(docs.size());
})
.options(
new ChangeFeedProcessorOptions()
.setFeedPollThroughputControlConfig(throughputControlGroupConfig)
.setStartFromBeginning(true)
)
.buildChangeFeedProcessor();
Added Payload Size Metrics for Gateway Mode
Date: February 8, 2024 PR: #38517
This update adds metrics for request and response payload sizes in Gateway mode. By monitoring these metrics (cosmos.client.req.reqPayloadSize
and cosmos.client.req.rspPayloadSize
), developers can better understand and optimize data transfer, which is crucial for managing costs and ensuring efficient performance.
Overloads for readMany
with Request Options
Date: February 20, 2024 PR: #38821
New overloads for CosmosAsyncContainer.readMany
and CosmosContainer.readMany
allow specifying additional request options such as excluded regions, diagnostics thresholds, and end-to-end timeout. This provides greater flexibility and control over read operations, improving the ability to manage and optimize cross-region data access.
Example Usage:
List<CosmosItemIdentity> itemIdentityList = Arrays.asList(
new CosmosItemIdentity(new PartitionKey("mypk1"), "item1"),
new CosmosItemIdentity(new PartitionKey("mypk2"), "item2")
);
CosmosReadManyRequestOptions options = new CosmosReadManyRequestOptions()
.setExcludedRegions(Collections.singletonList("region1"));
CosmosPagedIterable<MyItem> items = container.readMany(itemIdentityList, options, MyItem.class);
Public APIs for MicroBatch Size in CosmosBulkExecutionOptions
Date: March 25, 2024 PR: #39335
This feature makes the setMaxMicroBatchSize
and getMaxMicroBatchSize
APIs public in CosmosBulkExecutionOptions
, allowing developers to customize the maximum batch size for bulk operations. This is particularly useful for optimizing throughput and performance based on specific workload characteristics.
Example Usage:
CosmosBulkExecutionOptions options = new CosmosBulkExecutionOptions()
.setMaxMicroBatchSize(50);
Custom Item Serializer
Date: April 27, 2024 PR: #38997, #39933
Introduces new APIs getCustomItemSerializer
and setCustomItemSerializer
to enable custom payload transformations and serialization settings. These APIs allow developers to extend the built-in serialization, providing greater flexibility for handling complex data formats and custom transformations.
Example Usage:
public class MyCustomSerializer extends CosmosItemSerializer {
@Override
public Map<String, Object> serialize(Object item) {
// Custom serialization logic here
Map<String, Object> map = new HashMap<>();
// Custom serialization logic
return map;
}
@Override
public <T> T deserialize(Map<String, Object> jsonNodeMap, Class<T> classType) {
// Custom deserialization logic here
return classType.newInstance();
}
}
CosmosClientBuilder clientBuilder = new CosmosClientBuilder()
.setCustomSerializer(new MyCustomSerializer());
Vector Embedding Policy and Indexes for Vector Search
Date: May 19, 2024 PR: #39379
Enhances container properties to support vector searches by adding vector embedding policies and vector indexes. This feature enables Vector Similarity Search in Cosmos DB, providing capabilities for building GenAI applications that rely on vector operations for tasks such as recommendations, image searches, and natural language processing. Check out our RAG samples repo for building Java RAG applications with Java SDK and Langchain for Java in Azure Cosmos DB.
Sample JSON Configuration:
{
"id": "1d14c70a-e60e-489a-afbc-bf3193fae4b9",
"vectorEmbeddingPolicy": {
"vectorEmbeddings": [
{
"path": "/vector1",
"dataType": "float32",
"dimensions": 3,
"distanceFunction": "cosine"
},
{
"path": "/vector2",
"dataType": "int8",
"dimensions": 3,
"distanceFunction": "dotproduct"
},
{
"path": "/vector3",
"dataType": "uint8",
"dimensions": 3,
"distanceFunction": "euclidean"
}
]
},
"indexingPolicy": {
"vectorIndexes": [
{
"type": "flat",
"path": "/vector1"
},
{
"type": "quantizedFlat",
"path": "/vector2"
},
{
"type": "diskANN",
"path": "/vector3"
}
]
}
}
Support for Non-streaming OrderBy
Date: May 19, 2024 PR: #39897
Example Query:
public Iterable<Recipe> vectorSearch(List<Double> vector){
ArrayList<SqlParameter> paramList = new ArrayList<SqlParameter>();
paramList.add(new SqlParameter("@embedding", vector.stream().map(aDouble -> (Float) (float) aDouble.doubleValue()).collect(Collectors.toList()).toArray()));
SqlQuerySpec querySpec = new SqlQuerySpec("SELECT TOP 3 c.name, c.description, c.embedding, c.cuisine, c.difficulty, c.prepTime, c.cookTime, c.totalTime, c.servings, c.ingredients, c.instructions, VectorDistance(c.embedding,@embedding) AS SimilarityScore FROM c ORDER BY VectorDistance(c.embedding,@embedding)", paramList);
ArrayList<Recipe> filteredRecipes = (ArrayList<Recipe>) container.queryItems(querySpec, new CosmosQueryRequestOptions(), Recipe.class).collectList().block();
return filteredRecipes;
}
Regional Scope for Session Tokens
Date: May 19, 2024 PR: #38003
Introduces regionally scoped session tokens to better manage and reduce potential cross-regional replication delays. By maintaining session tokens at a regional level, the SDK can minimize retries and enhance performance, especially in globally distributed applications.
Example Configuration:
System.setProperty("COSMOS.SESSION_CAPTURING_TYPE", "REGION_SCOPED");
Query Statement in Diagnostics and Tracing
Date: May 24, 2024 PR: #39990
Enhances diagnostics with a new option to conditionally print query statements in the db.statement
attribute. Users can choose between NONE
, PARAMETERIZED_ONLY
, and ALL
, offering flexibility in managing sensitive information while still benefiting from detailed diagnostics.
Code Snippet:
new CosmosClientTelemetryConfig().showQueryOptions(CosmosClientTelemetryConfig.ShowQueryOptions.ALL);
Spring Connector Updates
IndexQueryMetrics in Configuration
Date: May 7, 2024 PR: #39433, #39623
Exposes indexQueryMetrics
to the CosmosConfig
via the application.properties
configuration file. This feature allows developers to gain insights into query performance, helping them optimize and troubleshoot queries more effectively.
Example Configuration:
azure.cosmos.index-query-metrics-enabled=true
Support for Transient Annotation
Date: June 3, 2024 PR: #39760
Fields annotated with @Transient
will not be persisted to the database during operations like save()
, saveAll()
, and upsert()
. However, they will continue to be serialized in the returned entity objects. This feature is useful for excluding non-persistent fields from database operations while still keeping them in the application logic.
Example Usage:
@Entity
public class MyEntity {
@Id
private String id;
private String name;
@Transient
private String nonPersistentField;
// getters and setters
}
Hierarchical Partition Key Support
Date: June 30, 2024 PR: #38365
Introduces hierarchical partition key support in the Spring Data Cosmos module, allowing more granular control over data partitioning. This is particularly beneficial for applications with complex data models that require multi-level partitioning.
Example Usage:
PartitionKey pk = new PartitionKeyBuilder()
.add(entity.getId())
.add(entity.getFirstName())
.add(entity.getLastName())
.build();
Optional<HierarchicalPartitionKeyEntity> result = repository.findById(entity.getId(), pk);
Spark Connector Updates
Configurable Connection Pool Size
Date: January 14, 2024 PR: #38305
Adds the option spark.cosmos.http.connectionPoolSize
to override the HTTP connection pool size in Gateway mode, addressing limits for high-concurrency scenarios. This is useful for containers with a high number of partitions, ensuring efficient and scalable network management.
Example Configuration:
val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.http.connectionPoolSize" -> "2000"
)
Optimization for readMany
Queries
Date: February 8, 2024 PR: #38299
Optimizes queries by using the readMany
API when applicable, improving performance for joins and large dataset reads. This feature allows Spark to leverage the efficiency of readMany
for retrieving large volumes of data based on dynamic partition pruning, significantly reducing query time and resource utilization. The readMany method in Java SDK is now also exposed as an API in the Spark Connector, turbo charging “read many items fast” scenarios. For a complete Scala demo sample, see here. For more information, check out this YouTube video from Cosmos Conf 2024!
Example Usage:
import com.azure.cosmos.spark.udf.GetCosmosItemIdentityValue
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
// Configurations
val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> "myDatabase",
"spark.cosmos.container" -> "myContainer",
"spark.cosmos.read.readManyFiltering.enabled" -> "true",
"spark.cosmos.read.runtimeFiltering.enabled" -> "true"
)
// Register UDF for item identity
spark.udf.register("GetCosmosItemIdentityValue", new GetCosmosItemIdentityValue(), StringType)
// Construct _itemIdentity for readMany
val identityDF = inputDf.withColumn("_itemIdentity", expr("GetCosmosItemIdentityValue(id, array(pk))"))
// Read many using the DataFrame with _itemIdentity
val readManyDf = com.azure.cosmos.spark.CosmosItemsDataSource.readMany(identityDF._jdf, cfg.asJava, schema)
Custom Schema Support for Nested Properties
Date: February 8, 2024 PR: #38481
Allows flattening a nested ObjectNode
into a StringType
for custom schemas. This feature simplifies the handling of complex nested JSON structures by allowing developers to store them as raw JSON strings, which can be more efficient and easier to manage in certain use cases.
Example Usage:
val schema = StructType(Seq(
StructField("nestedField", StringType), // Flattened nested object
StructField("anotherField", StringType)
))
val row = defaultRowConverter.fromObjectNodeToRow(schema, objectNode, SchemaConversionModes.Relaxed)
Reuse CosmosAsyncClient for Spark Applications
Date: February 20, 2024 PR: #38834
Provides an API to retrieve the CosmosAsyncClient
used internally by the connector, promoting client reuse across Spark applications. This reduces the overhead of creating multiple client instances and helps maintain consistent configuration across different parts of the application.
Example Usage:
val cosmosClient = CosmosAsyncClientCache.getCosmosClientFromCache(cfg.asJava).getClient
Spark 3.5 Support
Date: April 16, 2024 PR: #39395
Adds support for Spark 3.5, ensuring compatibility with the latest Spark release. This update allows developers to take advantage of the newest features and improvements in Spark, enhancing the performance and capabilities of their data processing pipelines.
Native Netty Transport
Date: April 27, 2024 PR: #39834
Introduces the option to use native Netty transport in Spark, which is more efficient, especially for high-connection scenarios. Native Netty transport improves network performance and resource utilization, making it ideal for applications with heavy network traffic.
Example Configuration:
val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.enforceNativeTransport" -> "true"
)
ManagedIdentity Authentication in Databricks
Date: April 27, 2024 PR: #39870
Supports Managed Identity authentication in Databricks, simplifying authentication setup in Azure environments. This feature allows seamless and secure integration with Azure Cosmos DB without the need to manage service principals or client secrets.
Example Usage:
val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.auth.type" -> "ManagedIdentity",
"spark.cosmos.account.subscriptionId" -> subscriptionId,
"spark.cosmos.account.tenantId" -> tenantId,
"spark.cosmos.account.resourceGroupName" -> resourceGroupName,
"spark.cosmos.database" -> cosmosDatabaseName,
"spark.cosmos.container" -> cosmosContainerName
)
For a complete notebook sample, see here.
Support for Access Tokens via AccountDataResolver
Date: May 20, 2024 PR: #40079
Introduces support for using access tokens in Spark environments via AccountDataResolver
, enabling more flexible authentication scenarios. This feature allows custom implementations to provide access tokens, enabling secure and dynamic authentication methods.
SPN Authentication with Certificates
Date: May 24, 2024 PR: #40325
Allows using Service Principal Name (SPN) authentication with certificates instead of client secrets, providing a more secure and manageable way to authenticate Spark applications. This feature simplifies security management by allowing the use of certificates, which can be rotated and managed more easily than secrets.
Example Usage:
val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.auth.type" -> "ServicePrincipal",
"spark.cosmos.auth.aad.clientId" -> clientId,
"spark.cosmos.auth.aad.clientCertPemBase64" -> clientCert,
"spark.cosmos.account.tenantId" -> tenantId,
"spark.cosmos.database" -> cosmosDatabaseName,
"spark.cosmos.container" -> cosmosContainerName
)
Kafka Connector Updates
Major V2 Release
Date: April 26, 2024 Blog: Enhancements in the Kafka Connector for Azure Cosmos DB
The V2 release of the Kafka Source and Sink connectors introduces significant improvements in scalability, performance, and flexibility. These enhancements are designed to provide better load balancing, improved handling of high-throughput scenarios, and more customizable configuration options. For more details, read the official blog post.
Key Benefits:
- Scalability: Improved handling of larger datasets and higher message rates, ensuring robust performance under load.
- Performance: Optimized operations to reduce latency and increase throughput.
- Flexibility: Enhanced configuration options to tailor the connector’s behavior to specific use cases.
Fixes, patches, and enhancements
In addition to all of the above features, we have of course made a large number of smaller bug fixes, security patches, enhancements, and improvements. You can track all the changes for each client library, along with the minimum version we recommend you use, by viewing the change logs:
- Java SDK change log
- Spring Data Client Library change log
- OLTP Spark Connector change log
- Kafka Connectors change log
Get Started with Java in Azure Cosmos DB
- Azure Cosmos DB Java SDK v4 technical documentation
- Diagnose and troubleshoot Azure Cosmos DB Java SDK v4
- Azure Cosmos DB Java SDK v4 getting started sample application
- Java V4 SDK comprehensive samples repository
- Azure Cosmos DB Spring Data Client Library Samples
- Cosmic Works Java
- Release notes and additional resources
- Exploring the Async API (reactor programming)
About Azure Cosmos DB
Azure Cosmos DB is a fully managed and serverless distributed database for modern app development, with SLA-backed speed and availability, automatic and instant scalability, and support for open-source PostgreSQL, MongoDB, and Apache Cassandra. Try Azure Cosmos DB for free here. To stay in the loop on Azure Cosmos DB updates, follow us on X, YouTube, and LinkedIn.
To easily build your first database, watch our Get Started videos on YouTube and explore ways to dev/test free.
0 comments