January 20th, 2022

Read many items fast with the Java SDK for Azure Cosmos DB

Theo van Kraay
Principal Program Manager

Typically, when retrieving many records from a database, you will run filter or aggregation queries. In these scenarios, latency is usually less important than the accuracy of the results. However, there are some use cases where many separate records need to be collected back very quickly. For example, you may be building a comparison site where thousands of entities are maintained independently, but frequently need to be pulled back during user comparisons.

In this situation, you can have a trade-off between the simplicity (but high latency) of running a large query, or the lower latency (but high client-side compute cost and complexity) of hitting the database with many point reads in parallel.

The following has been extracted from a real-world example, and has been codified into a more generic sample here. The sample demonstrates how the readMany method in Azure Cosmos DB Java SDK can help in this scenario. In this case we are using the readMany method along with CosmosAsyncContainer in a multi-threaded application using the reactor framework (note: the Azure Cosmos DB .NET SDK has the same method, which can be used in the same way).

Suppose you have a client machine (or machines), and you want to bring back 1,000 records using point reads as quickly as possible. In our demo sample, we have two methods that are compared for differences in their performance.

First, we have a method that executes point reads using as many concurrent threads as possible on the local client machine:

    private void readManyItems() throws InterruptedException {
        // collect the ids that were generated when writing the data.
        List<String> list = new ArrayList<String>();
        for (final JsonNode doc : docs) {
            list.add(doc.get("id").asText());
        }
        final long startTime = System.currentTimeMillis();
        Flux.fromIterable(list)
                .flatMap(id -> container.readItem(id, new PartitionKey(id), Item.class))
                .flatMap(itemResponse -> {
                    if (itemResponse.getStatusCode() == 200) {
                        double requestCharge = itemResponse.getRequestCharge();
                        BinaryOperator<Double> add = (u, v) -> u + v;
                        totalRequestCharges.getAndAccumulate(requestCharge, add);
                    } else
                        logger.info("WARNING insert status code {} != 200" + itemResponse.getStatusCode());
                    request_count.getAndIncrement();
                    return Mono.empty();
                }).subscribe();
        logger.info("Waiting while subscribed async operation completes all threads...");
        while (request_count.get() < NUMBER_OF_DOCS) {
            // looping while subscribed async operation completes all threads
        }
        if (request_count.get() == NUMBER_OF_DOCS) {
            request_count.set(0);
            final long endTime = System.currentTimeMillis();
            final long duration = (endTime - startTime);
            totalReadLatency.getAndAdd(duration);
        }
    }

Next, we have a method that divides the requests into chunks using the NUMBER_OF_DOCS_PER_THREAD parameter, and each thread will send a list of id and partition key tuples to the database to be executed using readMany:

    private void readManyItemsEnhanced() throws InterruptedException {
        // collect the ids that were generated when writing the data.
        List<String> list = new ArrayList<String>();
        for (final JsonNode doc : docs) {
            list.add(doc.get("id").asText());
        }
        List<List<String>> lists = ListUtils.partition(list, NUMBER_OF_DOCS_PER_THREAD);

        final long startTime = System.currentTimeMillis();
        Flux.fromIterable(lists).flatMap(x -> {

            List<CosmosItemIdentity> pairList = new ArrayList<>();

            // add point reads in this thread as a list to be sent to Cosmos DB
            for (final String id : x) {
                // increment request count here so that total requests will equal total docs
                request_count.getAndIncrement();
                pairList.add(new CosmosItemIdentity(new PartitionKey(String.valueOf(id)), String.valueOf(id)));
            }

            // instead of reading sequentially, send CosmosItem id and partition key tuple of items to be read
            Mono<FeedResponse<Item>> documentFeedResponse = container.readMany(pairList, Item.class);                    
            double requestCharge = documentFeedResponse.block().getRequestCharge();
            BinaryOperator<Double> add = (u, v) -> u + v;
            totalEnhancedRequestCharges.getAndAccumulate(requestCharge, add);
            return documentFeedResponse;
        })
                .subscribe();

        logger.info("Waiting while subscribed async operation completes all threads...");
        while (request_count.get() < NUMBER_OF_DOCS) {
            // looping while subscribed async operation completes all threads
        }
        if (request_count.get() == NUMBER_OF_DOCS) {
            request_count.set(0);
            final long endTime = System.currentTimeMillis();
            final long duration = (endTime - startTime);
            totalEnhancedReadLatency.getAndAdd(duration);
        }
    }
First, let’s see what happens when we run the demo and configure just 10 documents per thread for the enhanced readManyItemsEnhanced() method:
public static final int NUMBER_OF_DOCS = 1000;
public static final int NUMBER_OF_DOCS_PER_THREAD = 10;

INFO: Reading many items.... 
INFO: Waiting while subscribed async operation completes all threads...
INFO: Reading many items (enhanced using readMany method)
INFO: Waiting while subscribed async operation completes all threads...
INFO: Total latency with standard multi-threading: 907 
INFO: Total latency using readMany method: 2109 
INFO: Total request charges with standard multi-threading: 1000.0 
INFO: Total request charges using readMany method: 927.1299999999991

 

In this test, we can see that latency is higher when using readMany. With the local resources available, executing point reads with standard multi-threading performs better than splitting the requests and sending 10 requests per thread using readMany. However, notice that the total request unit charges are slightly lower with readMany. Now let’s see what happens when we increase the number of NUMBER_OF_DOCS_PER_THREAD to 100:

public static final int NUMBER_OF_DOCS = 1000;
public static final int NUMBER_OF_DOCS_PER_THREAD = 100;

INFO: Reading many items.... 
INFO: Waiting while subscribed async operation completes all threads...
INFO: Reading many items (enhanced using readMany method)
INFO: Waiting while subscribed async operation completes all threads...
INFO: Total latency with standard multi-threading: 825
INFO: Total latency using readMany method: 380
INFO: Total request charges with standard multi-threading: 1000.0
INFO: Total request charges using readMany method: 212.79000000000002

We can now see that latency is significantly reduced compared to using multi-threading. Not only that, but RU charges are relatively much lower for the same number of point reads! This approach is a great solution if you need to run many point reads in parallel, but don’t have a large number of client machines or cores standing by to serve a large “scatter gather” request, and need to mitigate the trade-off on overall latency. Keep in mind that the extent to which this approach will benefit latency in the application will depend on the average size of each document, so you should benchmark the performance when implementing this.

Get Started

About Azure Cosmos DB

Azure Cosmos DB is a fast and scalable distributed NoSQL database, built for modern application development. Get guaranteed single-digit millisecond response times and 99.999-percent availability, backed by SLAs, automatic and instant scalability, and open-source APIs for MongoDB and Cassandra. Enjoy fast writes and reads anywhere in the world with turnkey data replication and multi-region writes.

To easily build your first database, watch our Get Started videos on YouTube and explore ways to dev/test free.

Author

Theo van Kraay
Principal Program Manager

Principal Program Manager on the Azure Cosmos DB engineering team. Focused on Apache Cassandra offerings, Java ecosystem, high availability, and customer success.

0 comments

Discussion are closed.

Feedback