July 18th, 2024

Latest NoSQL Java Ecosystem Updates 2024 Q1 & Q2

Theo van Kraay
Principal Program Manager

We’re always busy adding new features, fixes, patches, and improvements to our Java-based client libraries for Azure Cosmos DB for NoSQL. In this regular blog series, we share highlights of recent updates in the last period.

January – June 2024 updates

Java SDK Updates

Integrate ThroughputControl with ChangeFeedProcessor

Date: January 3, 2024 PR: #38052

This feature introduces local ThroughputControl for ChangeFeedProcessor (CFP), which provides an easy way for customers to start using throughput control without any extra resource provisioning. By integrating throughput control with CFP, users can set a low priority for the change feed processor workload, ensuring that it does not impact other workloads running in parallel. This prevents heavy throttling during times of high request unit (RU) usage.

Example Usage:

ThroughputControlGroupConfig throughputControlGroupConfig = 
    new ThroughputControlGroupConfigBuilder()
        .groupName("cfp")
        .targetThroughput(300)
        .priorityLevel(PriorityLevel.LOW)
        .build();

ChangeFeedProcessor changeFeedProcessor =
    new ChangeFeedProcessorBuilder()
        .hostName("test")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(docs -> {
            System.out.println(docs.size());
        })
        .options(
            new ChangeFeedProcessorOptions()
                .setFeedPollThroughputControlConfig(throughputControlGroupConfig)
                .setStartFromBeginning(true)
        )
        .buildChangeFeedProcessor();

Added Payload Size Metrics for Gateway Mode

Date: February 8, 2024 PR: #38517

This update adds metrics for request and response payload sizes in Gateway mode. By monitoring these metrics (cosmos.client.req.reqPayloadSize and cosmos.client.req.rspPayloadSize), developers can better understand and optimize data transfer, which is crucial for managing costs and ensuring efficient performance.

Overloads for readMany with Request Options

Date: February 20, 2024 PR: #38821

New overloads for CosmosAsyncContainer.readMany and CosmosContainer.readMany allow specifying additional request options such as excluded regions, diagnostics thresholds, and end-to-end timeout. This provides greater flexibility and control over read operations, improving the ability to manage and optimize cross-region data access.

Example Usage:

List<CosmosItemIdentity> itemIdentityList = Arrays.asList(
    new CosmosItemIdentity(new PartitionKey("mypk1"), "item1"),
    new CosmosItemIdentity(new PartitionKey("mypk2"), "item2")
);
CosmosReadManyRequestOptions options = new CosmosReadManyRequestOptions()
    .setExcludedRegions(Collections.singletonList("region1"));

CosmosPagedIterable<MyItem> items = container.readMany(itemIdentityList, options, MyItem.class);

Public APIs for MicroBatch Size in CosmosBulkExecutionOptions

Date: March 25, 2024 PR: #39335

This feature makes the setMaxMicroBatchSize and getMaxMicroBatchSize APIs public in CosmosBulkExecutionOptions, allowing developers to customize the maximum batch size for bulk operations. This is particularly useful for optimizing throughput and performance based on specific workload characteristics.

Example Usage:

CosmosBulkExecutionOptions options = new CosmosBulkExecutionOptions()
    .setMaxMicroBatchSize(50);

Custom Item Serializer

Date: April 27, 2024 PR: #38997#39933

Introduces new APIs getCustomItemSerializer and setCustomItemSerializer to enable custom payload transformations and serialization settings. These APIs allow developers to extend the built-in serialization, providing greater flexibility for handling complex data formats and custom transformations.

Example Usage:

public class MyCustomSerializer extends CosmosItemSerializer {
    @Override
    public Map<String, Object> serialize(Object item) {
        // Custom serialization logic here
        Map<String, Object> map = new HashMap<>();
        // Custom serialization logic
        return map;
    }

    @Override
    public <T> T deserialize(Map<String, Object> jsonNodeMap, Class<T> classType) {
        // Custom deserialization logic here
        return classType.newInstance();
    }
}

CosmosClientBuilder clientBuilder = new CosmosClientBuilder()
    .setCustomSerializer(new MyCustomSerializer());

Date: May 19, 2024 PR: #39379

Enhances container properties to support vector searches by adding vector embedding policies and vector indexes. This feature enables Vector Similarity Search in Cosmos DB, providing capabilities for building GenAI applications that rely on vector operations for tasks such as recommendations, image searches, and natural language processing. Check out our RAG samples repo for building Java RAG applications with Java SDK and Langchain for Java in Azure Cosmos DB.

Sample JSON Configuration:

{
    "id": "1d14c70a-e60e-489a-afbc-bf3193fae4b9",
    "vectorEmbeddingPolicy": {
        "vectorEmbeddings": [
            {
                "path": "/vector1",
                "dataType": "float32",
                "dimensions": 3,
                "distanceFunction": "cosine"
            },
            {
                "path": "/vector2",
                "dataType": "int8",
                "dimensions": 3,
                "distanceFunction": "dotproduct"
            },
            {
                "path": "/vector3",
                "dataType": "uint8",
                "dimensions": 3,
                "distanceFunction": "euclidean"
            }
        ]
    },
    "indexingPolicy": {
        "vectorIndexes": [
            {
                "type": "flat",
                "path": "/vector1"
            },
            {
                "type": "quantizedFlat",
                "path": "/vector2"
            },
            {
                "type": "diskANN",
                "path": "/vector3"
            }
        ]
    }
}


Support for Non-streaming OrderBy

Date: May 19, 2024 PR: #39897

Added a non-streaming OrderBy query mode, crucial for vector search capabilities.

Example Query:

public Iterable<Recipe> vectorSearch(List<Double> vector){
    ArrayList<SqlParameter> paramList = new ArrayList<SqlParameter>();
    paramList.add(new SqlParameter("@embedding", vector.stream().map(aDouble -> (Float) (float) aDouble.doubleValue()).collect(Collectors.toList()).toArray()));
    SqlQuerySpec querySpec = new SqlQuerySpec("SELECT TOP 3 c.name, c.description, c.embedding, c.cuisine, c.difficulty, c.prepTime, c.cookTime, c.totalTime, c.servings, c.ingredients, c.instructions,  VectorDistance(c.embedding,@embedding) AS SimilarityScore   FROM c ORDER BY VectorDistance(c.embedding,@embedding)", paramList);
    ArrayList<Recipe> filteredRecipes = (ArrayList<Recipe>) container.queryItems(querySpec, new CosmosQueryRequestOptions(), Recipe.class).collectList().block();
    return filteredRecipes;
}

Regional Scope for Session Tokens

Date: May 19, 2024 PR: #38003

Introduces regionally scoped session tokens to better manage and reduce potential cross-regional replication delays. By maintaining session tokens at a regional level, the SDK can minimize retries and enhance performance, especially in globally distributed applications.

Example Configuration:

System.setProperty("COSMOS.SESSION_CAPTURING_TYPE", "REGION_SCOPED");

Query Statement in Diagnostics and Tracing

Date: May 24, 2024 PR: #39990

Enhances diagnostics with a new option to conditionally print query statements in the db.statement attribute. Users can choose between NONEPARAMETERIZED_ONLY, and ALL, offering flexibility in managing sensitive information while still benefiting from detailed diagnostics.

Code Snippet:

new CosmosClientTelemetryConfig().showQueryOptions(CosmosClientTelemetryConfig.ShowQueryOptions.ALL);

Spring Connector Updates

IndexQueryMetrics in Configuration

Date: May 7, 2024 PR: #39433#39623

Exposes indexQueryMetrics to the CosmosConfig via the application.properties configuration file. This feature allows developers to gain insights into query performance, helping them optimize and troubleshoot queries more effectively.

Example Configuration:

azure.cosmos.index-query-metrics-enabled=true

Support for Transient Annotation

Date: June 3, 2024 PR: #39760

Fields annotated with @Transient will not be persisted to the database during operations like save()saveAll(), and upsert(). However, they will continue to be serialized in the returned entity objects. This feature is useful for excluding non-persistent fields from database operations while still keeping them in the application logic.

Example Usage:

@Entity
public class MyEntity {
    @Id
    private String id;

    private String name;

    @Transient
    private String nonPersistentField;

    // getters and setters
}

Hierarchical Partition Key Support

Date: June 30, 2024 PR: #38365

Introduces hierarchical partition key support in the Spring Data Cosmos module, allowing more granular control over data partitioning. This is particularly beneficial for applications with complex data models that require multi-level partitioning.

Example Usage:

PartitionKey pk = new PartitionKeyBuilder()
    .add(entity.getId())
    .add(entity.getFirstName())
    .add(entity.getLastName())
    .build();

Optional<HierarchicalPartitionKeyEntity> result = repository.findById(entity.getId(), pk);

Spark Connector Updates

Configurable Connection Pool Size

Date: January 14, 2024 PR: #38305

Adds the option spark.cosmos.http.connectionPoolSize to override the HTTP connection pool size in Gateway mode, addressing limits for high-concurrency scenarios. This is useful for containers with a high number of partitions, ensuring efficient and scalable network management.

Example Configuration:

val cfg = Map(
    "spark.cosmos.accountEndpoint" -> cosmosEndpoint,
    "spark.cosmos.accountKey" -> cosmosMasterKey,
    "spark.cosmos.http.connectionPoolSize" -> "2000"
)

Optimization for readMany Queries

Date: February 8, 2024 PR: #38299

Optimizes queries by using the readMany API when applicable, improving performance for joins and large dataset reads. This feature allows Spark to leverage the efficiency of readMany for retrieving large volumes of data based on dynamic partition pruning, significantly reducing query time and resource utilization. The readMany method in Java SDK is now also exposed as an API in the Spark Connector, turbo charging “read many items fast” scenarios. For a complete Scala demo sample, see here. For more information, check out this YouTube video from Cosmos Conf 2024!

Example Usage:

import com.azure.cosmos.spark.udf.GetCosmosItemIdentityValue
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// Configurations
val cfg = Map(
    "spark.cosmos.accountEndpoint" -> cosmosEndpoint,
    "spark.cosmos.accountKey" -> cosmosMasterKey,
    "spark.cosmos.database" -> "myDatabase",
    "spark.cosmos.container" -> "myContainer",
    "spark.cosmos.read.readManyFiltering.enabled" -> "true",
    "spark.cosmos.read.runtimeFiltering.enabled" -> "true"
)

// Register UDF for item identity
spark.udf.register("GetCosmosItemIdentityValue", new GetCosmosItemIdentityValue(), StringType)

// Construct _itemIdentity for readMany
val identityDF = inputDf.withColumn("_itemIdentity", expr("GetCosmosItemIdentityValue(id, array(pk))"))

// Read many using the DataFrame with _itemIdentity
val readManyDf = com.azure.cosmos.spark.CosmosItemsDataSource.readMany(identityDF._jdf, cfg.asJava, schema)

Custom Schema Support for Nested Properties

Date: February 8, 2024 PR: #38481

Allows flattening a nested ObjectNode into a StringType for custom schemas. This feature simplifies the handling of complex nested JSON structures by allowing developers to store them as raw JSON strings, which can be more efficient and easier to manage in certain use cases.

Example Usage:

val schema = StructType(Seq(
  StructField("nestedField", StringType), // Flattened nested object
  StructField("anotherField", StringType)
))

val row = defaultRowConverter.fromObjectNodeToRow(schema, objectNode, SchemaConversionModes.Relaxed)

Reuse CosmosAsyncClient for Spark Applications

Date: February 20, 2024 PR: #38834

Provides an API to retrieve the CosmosAsyncClient used internally by the connector, promoting client reuse across Spark applications. This reduces the overhead of creating multiple client instances and helps maintain consistent configuration across different parts of the application.

Example Usage:

val cosmosClient = CosmosAsyncClientCache.getCosmosClientFromCache(cfg.asJava).getClient

Spark 3.5 Support

Date: April 16, 2024 PR: #39395

Adds support for Spark 3.5, ensuring compatibility with the latest Spark release. This update allows developers to take advantage of the newest features and improvements in Spark, enhancing the performance and capabilities of their data processing pipelines.

Native Netty Transport

Date: April 27, 2024 PR: #39834

Introduces the option to use native Netty transport in Spark, which is more efficient, especially for high-connection scenarios. Native Netty transport improves network performance and resource utilization, making it ideal for applications with heavy network traffic.

Example Configuration:

val cfg = Map(
    "spark.cosmos.accountEndpoint" -> cosmosEndpoint,
    "spark.cosmos.accountKey" -> cosmosMasterKey,
    "spark.cosmos.enforceNativeTransport" -> "true"
)

ManagedIdentity Authentication in Databricks

Date: April 27, 2024 PR: #39870

Supports Managed Identity authentication in Databricks, simplifying authentication setup in Azure environments. This feature allows seamless and secure integration with Azure Cosmos DB without the need to manage service principals or client secrets.

Example Usage:

val cfg = Map(
    "spark.cosmos.accountEndpoint" -> cosmosEndpoint,
    "spark.cosmos.auth.type" -> "ManagedIdentity",
    "spark.cosmos.account.subscriptionId" -> subscriptionId,
    "spark.cosmos.account.tenantId" -> tenantId,
    "spark.cosmos.account.resourceGroupName" -> resourceGroupName,
    "spark.cosmos.database" -> cosmosDatabaseName,
    "spark.cosmos.container" -> cosmosContainerName
)

For a complete notebook sample, see here.

Support for Access Tokens via AccountDataResolver

Date: May 20, 2024 PR: #40079

Introduces support for using access tokens in Spark environments via AccountDataResolver, enabling more flexible authentication scenarios. This feature allows custom implementations to provide access tokens, enabling secure and dynamic authentication methods.

SPN Authentication with Certificates

Date: May 24, 2024 PR: #40325

Allows using Service Principal Name (SPN) authentication with certificates instead of client secrets, providing a more secure and manageable way to authenticate Spark applications. This feature simplifies security management by allowing the use of certificates, which can be rotated and managed more easily than secrets.

Example Usage:

val cfg = Map(
    "spark.cosmos.accountEndpoint" -> cosmosEndpoint,
    "spark.cosmos.auth.type" -> "ServicePrincipal",
    "spark.cosmos.auth.aad.clientId" -> clientId,
    "spark.cosmos.auth.aad.clientCertPemBase64" -> clientCert,
    "spark.cosmos.account.tenantId" -> tenantId,
    "spark.cosmos.database" -> cosmosDatabaseName,
    "spark.cosmos.container" -> cosmosContainerName
)

Kafka Connector Updates

Major V2 Release

Date: April 26, 2024 Blog: Enhancements in the Kafka Connector for Azure Cosmos DB

The V2 release of the Kafka Source and Sink connectors introduces significant improvements in scalability, performance, and flexibility. These enhancements are designed to provide better load balancing, improved handling of high-throughput scenarios, and more customizable configuration options. For more details, read the official blog post.

Key Benefits:

  • Scalability: Improved handling of larger datasets and higher message rates, ensuring robust performance under load.
  • Performance: Optimized operations to reduce latency and increase throughput.
  • Flexibility: Enhanced configuration options to tailor the connector’s behavior to specific use cases.

 

Fixes, patches, and enhancements

In addition to all of the above features, we have of course made a large number of smaller bug fixes, security patches, enhancements, and improvements. You can track all the changes for each client library, along with the minimum version we recommend you use, by viewing the change logs:

Get Started with Java in Azure Cosmos DB

About Azure Cosmos DB

Azure Cosmos DB is a fully managed and serverless distributed database for modern app development, with SLA-backed speed and availability, automatic and instant scalability, and support for open-source PostgreSQL, MongoDB, and Apache Cassandra. Try Azure Cosmos DB for free here. To stay in the loop on Azure Cosmos DB updates, follow us on X, YouTube, and LinkedIn.

To easily build your first database, watch our Get Started videos on YouTube and explore ways to dev/test free.

Author

Theo van Kraay
Principal Program Manager

Principal Program Manager on the Azure Cosmos DB engineering team. Currently focused on AI, programmability, and developer experience for Azure Cosmos DB.

0 comments

Discussion are closed.