Getting started with Kafka Connector for Azure Cosmos DB using Docker

Abhishek Gupta

Having a local development environment is quite handy when trying out a new service or technology. Docker has emerged as the de-facto choice in such cases. It is specially useful in scenarios where you’re trying to integrate multiple services and gives you the ability to to start fresh before each run.

This blog post is a getting started guide for the Kafka Connector for Azure Cosmos DB. All the components (including Azure Cosmos DB) will run on your local machine, thanks to:

  • The Azure Cosmos DB Linux Emulator which can be used for local development and testing purposes without creating an Azure subscription or incurring any costs.
  • And, Docker Compose which is a tool for defining and running multi-container Docker applications. It will orchestrate all the components required by our setup including Azure Cosmos DB emulator, Kafka, Zookeeper, Kafka connectors etc.

 

Image diagram

To make things easier, we will pick single-focused scenarios and go step by step:

  • Step 0 – A simple scenario to check if our setup if functional.
  • How to handle streaming JSON data
  • How to handle streaming JSON data which is not compatible with Azure Cosmos DB
  • How to handle Avro data using Schema Registry

It is assumed that you’re comfortable with Kafka and have an understanding of Kafka Connect

First things first…

… here is a quick overview of the the Azure Cosmos DB Emulator and the Kafka Connector.

The Azure Cosmos DB connector allows you to move data between Azure Cosmos DB and Kafka. It’s available as a source as well as a sink. The Azure Cosmos DB Sink connector writes data from a Kafka topic to an Azure Cosmos DB container and the Source connector writes changes from an Azure Cosmos DB container to a Kafka topic. At the time of writing, the connector is in pre-production mode. You can read more about it on the GitHub repo or install/download it from the Confluent Hub.

The Azure Cosmos DB Linux Emulator provides a local environment that emulates the Azure Cosmos DB service for development purposes (currently, it only supports SQL API). It provides a high-fidelity emulation of the Azure Cosmos DB service and supports functionality such as creating data, querying data, provisioning and scaling containers, and executing stored procedures and triggers.

At the time of writing, the Azure Cosmos DB Linux Emulator is in preview.

You can dive into how to use the emulator on macOS or Linux, how is it different from the Azure Cosmos DB cloud service, troubleshoot issues, etc.

Before you start…

Make sure you have Docker and docker-compose installed.

Also, clone the project from GitHub:

git clone https://github.com/Azure-Samples/cosmosdb-kafka-connect-docker
cd cosmosdb-kafka-connect-docker

Start all the services

All the components are defined in the docker-compose file:

  • Azure Cosmos DB emulator
  • Kafka and Zookeeper
  • Azure Cosmos DB and Datagen connectors (run as separate Kafka Connect workers)
  • Confluent Schema Registry

Thanks to Docker Compose, the environment can be brought up with a single command. When you run this for this the first time, it may take a while for the containers to be downloaded (subsequent executions are faster). Optionally, you can also download the images separately before starting Docker Compose:

(optional)
docker pull confluentinc/cp-zookeeper:latest
docker pull confluentinc/cp-kafka:latest
docker pull confluentinc/cp-schema-registry:latest

To start all the services:

docker-compose -p cosmosdb-kafka-docker up --build

After a couple of minutes, check the containers:

docker-compose -p cosmosdb-kafka-docker ps

# expected output

               Name                              Command                  State                     Ports               
------------------------------------------------------------------------------------------------------------------------
cosmosdb                              /usr/local/bin/cosmos/start.sh   Up             0.0.0.0:10251->10251/tcp,:::10251-
                                                                                      >10251/tcp, 0.0.0.0:10252->10252/t
                                                                                      cp,:::10252->10252/tcp, 0.0.0.0:10
                                                                                      253->10253/tcp,:::10253->10253/tcp
                                                                                      , 0.0.0.0:10254->10254/tcp,:::1025
                                                                                      4->10254/tcp, 0.0.0.0:8081->8081/t
                                                                                      cp,:::8081->8081/tcp              
cosmosdb-kafka-docker_cosmosdb-       /etc/confluent/docker/run        Up (healthy)   0.0.0.0:8083->8083/tcp,:::8083->80
connector_1                                                                           83/tcp, 9092/tcp                  
cosmosdb-kafka-docker_datagen-        /etc/confluent/docker/run        Up (healthy)   0.0.0.0:8080->8080/tcp,:::8080->80
connector_1                                                                           80/tcp, 8083/tcp, 9092/tcp        
kafka                                 /etc/confluent/docker/run        Up             0.0.0.0:29092->29092/tcp,:::29092-
                                                                                      >29092/tcp, 0.0.0.0:9092->9092/tcp
                                                                                      ,:::9092->9092/tcp                
schema-registry                       /etc/confluent/docker/run        Up             0.0.0.0:9090->8081/tcp,:::9090->80
                                                                                      81/tcp                            
zookeeper                             /etc/confluent/docker/run        Up             0.0.0.0:2181->2181/tcp,:::2181->21
                                                                                      81/tcp, 2888/tcp, 3888/tcp

Once all the services as up and running, the next logical step is to install the connector, right? Well, there a couple of things we need to take care of. For Java apps to connect to the Azure Cosmos DB emulator, you need to have certificates installed in the Java certificate store. In this case, we will seed the certificate from the Azure Cosmos DB emulator container to the Cosmos DB Kafka Connect container.

Although this process can be automated, I am doing it manually to make things clear.

Configure Azure Cosmos DB Emulator Certificates

Execute this command to store the certificate in the Java certificate store (using docker exec):

docker exec --user root -it cosmosdb-kafka-docker_cosmosdb-connector_1 /bin/bash

# execute the below command inside the container
curl -k https://cosmosdb:8081/_explorer/emulator.pem > ~/emulatorcert.crt && keytool -noprompt -storepass changeit -keypass changeit -keystore /usr/lib/jvm/zulu11-ca/lib/security/cacerts -importcert -alias emulator_cert -file ~/emulatorcert.crt

You should see this output – Certificate was added to keystore

And, one last thing before we proceed…

Create Azure Cosmos DB database and containers

Access the Azure Cosmos DB emulator portal at https://localhost:8081/_explorer/index.html and create the below resources:

  • Database named testdb
  • Containers – inventory, orders, orders_avro (ensure that the partition key for all the containers is /id)

Image cosmosdb objects

Let’s explore all the scenarios

To start off let’s look at the basic scenario. Before trying out other things, we want to make sure everything is functional.

1. Hello world!

Start the inventory data connector for Cosmos DB:

curl -X POST -H "Content-Type: application/json" -d @cosmosdb-inventory-connector_1.json http://localhost:8083/connectors

# to check the connector status
curl http://localhost:8083/connectors/inventory-sink/status

To test the end to end flow, send a few records to the inventory_topic topic in Kafka:

docker exec -it kafka bash -c 'cd /usr/bin && kafka-console-producer --topic inventory_topic --bootstrap-server kafka:29092'

Once the prompt is ready, send the JSON records one by one:

{"id": "5000","quantity": 100,"productid": 42}
{"id": "5001","quantity": 99,"productid": 43}
{"id": "5002","quantity": 98,"productid": 44}

Check Cosmos DB container to confirm if records have been saved. Navigate to the the portal https://localhost:8081/_explorer/index.html and check the inventory container:

Image result1

Ok, it worked! Let’s go ahead and do something slightly more useful. Before moving on, delete the inventory connector.

curl -X DELETE http://localhost:8083/connectors/inventory-sink/

2. Sync streaming data (JSON format) from Kafka to Azure Cosmos DB

For the remaining scenarios, we will use a producer component to generate records. The Kafka Connect Datagen connector is our friend. It is meant for generating mock data for testing, so let’s put it to good use!

Start an instance of the Azure Cosmos DB connector:

curl -X POST -H "Content-Type: application/json" -d @cosmosdb-inventory-connector_2.json http://localhost:8083/connectors

# to check the connector status
curl http://localhost:8083/connectors/inventory-sink/status

Once it’s ready, go ahead and start the Datagen connector which will generate mock inventory data in JSON format:

curl -X POST -H "Content-Type: application/json" -d @datagen-inventory-connector.json http://localhost:8080/connectors

# to check the connector status
curl http://localhost:8080/connectors/datagen-inventory/status

Note that we use port 8080 for the Datagen connector since it’s running in a separate Kafka Connect container

To look at the data produced by the Datagen connector, check the inventory_topic1 Kafka topic:

docker exec -it kafka bash -c 'cd /usr/bin && kafka-console-consumer --topic inventory_topic1 --bootstrap-server kafka:29092'

Notice the data (it maybe different in your case):

{"id":5,"quantity":5,"productid":5}
{"id":6,"quantity":6,"productid":6}
{"id":7,"quantity":7,"productid":7}
...

Note that id has a Integer value

Check Azure Cosmos DB containers to confirm if records have been saved. Navigate to the the portal https://localhost:8081/_explorer/index.html and check the inventory container:

Image result2

The records in Cosmos DB have an id attribute of String data type. The original data in the Kafka topic had an id attribute of Integer type – but that would not have worked, since Azure Cosmos DB requires id to be a unique user-defined string. This conversion was made possible by a Kafka Connect transform – Cast updates fields (or the entire key or value) to a specific type, updating the schema if one is present.

Here is the part in the connector configuration which did the trick:

"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "id:string"

Before moving on, delete the Cosmos DB and Datagen inventory connectors.

curl -X DELETE http://localhost:8080/connectors/datagen-inventory
curl -X DELETE http://localhost:8083/connectors/inventory-sink/

3. Push streaming Orders data (JSON format) from Kafka to Azure Cosmos DB

Now, let’s switch gears and use the same data (JSON formatted) data, but with a slight twist. We will use a variant of the Datagen connector to generate mock orders data and adjust the Cosmos DB connector as well.

To install a different instance of the Azure Cosmos DB connector:

curl -X POST -H "Content-Type: application/json" -d @cosmosdb-orders-connector_1.json http://localhost:8083/connectors

# to check the connector status
curl http://localhost:8083/connectors/orders-sink/status

Install the Datagen orders connector:

curl -X POST -H "Content-Type: application/json" -d @datagen-orders-connector.json http://localhost:8080/connectors

# to check the connector status
curl http://localhost:8080/connectors/datagen-orders/status

To look at the data produced by the Datagen connector, check the orders Kafka topic:

docker exec -it kafka bash -c 'cd /usr/bin && kafka-console-consumer --topic orders_topic --bootstrap-server kafka:29092'

Notice the data (it maybe different in your case):

{"ordertime":1496251410176,"orderid":3,"itemid":"Item_869","orderunits":3.2897805449886226,"address":{"city":"City_99","state":"State_46","zipcode":50570}}

{"ordertime":1500129505219,"orderid":4,"itemid":"Item_339","orderunits":3.6719921257659918,"address":{"city":"City_84","state":"State_55","zipcode":88573}}

{"ordertime":1498873571020,"orderid":5,"itemid":"Item_922","orderunits":8.4506812669258,"address":{"city":"City_48","state":"State_66","zipcode":55218}}

{"ordertime":1513855504436,"orderid":6,"itemid":"Item_545","orderunits":7.82561522361042,"address":{"city":"City_44","state":"State_71","zipcode":87868}}
...

I chose Orders data on purpose since it is different compared to the Inventory data. Notice that JSON record produced by the Datagen connector has a orderid attribute (Integer data type), but no id attribute – but we know that Azure Cosmos DB won’t work without one.

Check Cosmos DB containers to confirm if records have been saved. Navigate to the the portal https://localhost:8081/_explorer/index.html and check the orders container:

Image result3

Notice that there is no orderid attribute in the records stored in Azure Cosmos DB. In fact, it has been replaced by the id attribute (with a String value). This achieved by the ReplaceField transformer.

Here is the part in the connector configuration which made this possible:

"transforms": "RenameField,Cast",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "orderid:id",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "id:string"

Depending on your use case, removing/renaming a field altogether may not be an ideal solution. However, it’s good to know that there are options. Also, remember that the original data in Kafka topics is still there, untouched. Other downstream apps can still leverage it.

Before moving on, delete the Cosmos DB and Datagen inventory connectors.

curl -X DELETE http://localhost:8080/connectors/datagen-orders
curl -X DELETE http://localhost:8083/connectors/orders-sink/

4. Push streaming Orders data (AVRO format) from Kafka to Azure Cosmos DB

So far we dealt with JSON, a commonly used data format. But, Avro is heavily used in production due to its compact format which leads to better performance and cost savings. To make it easier to deal with Avro data schema, there is Confluent Schema Registry which provides a serving layer for your metadata along with a RESTful interface for storing and retrieving your Avro (as well as JSON and Protobuf schemas). We will use the Docker version for the purposes of this blog post.

Install a new instance of the Azure Cosmos DB connector that can handle Avro data:

curl -X POST -H "Content-Type: application/json" -d @cosmosdb-orders-connector_2.json http://localhost:8083/connectors

# to check the connector status
curl http://localhost:8083/connectors/orders-sink/status

Install Datagen connector which will generate mock orders data in Avro format:

curl -X POST -H "Content-Type: application/json" -d @datagen-orders-connector-avro.json http://localhost:8080/connectors

# to check the connector status
curl http://localhost:8080/connectors/datagen-orders/status

To look at the Avro data produced by the Datagen connector, check the orders_avro_topic Kafka topic:

docker exec -it kafka bash -c 'cd /usr/bin && kafka-console-consumer --topic orders_avro_topic --bootstrap-server kafka:29092'

Since the Avro data in binary format, it’s not human readable:

�����VItem_185lqf�@City_61State_73��
����WItem_219[�C��@City_74State_77��
�����VItem_7167Ix�dF�?City_53State_53��
���֩WItem_126*���?@City_58State_21��
�����VItem_329X�2,@City_49State_79��
�����XItem_886��>�|�@City_88State_27��
��V Item_956�r#�!@City_45State_96��
�ѼҕW"Item_157E�)$���?City_96State_63��
...

Check Cosmos DB containers to confirm if records have been saved. Navigate to the the portal https://localhost:8081/_explorer/index.html and check the orders_avro container:

Image result4

Great, things work as expected! The connector configuration was updated to handle this:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
...

The changes include choosing the AvroConverter, enabling schemas and pointing to Schema Registry (in our case running locally in Docker).

That is all for the use cases covered in this blog post. We only covered the Sink connector, but feel free to explore and experiment further! For example, you could extend the current setup to include the Source connector and configure it to send records from Azure Cosmos DB containers to Kafka.

Clean up

Once you have finished, you can delete the connectors:

curl -X DELETE http://localhost:8080/connectors/datagen-orders
curl -X DELETE http://localhost:8083/connectors/orders-sink/

To stop all the Docker components:

docker-compose -p cosmosdb-kafka-docker down -v

Conclusion

Although we covered simple scenarios for demonstration purposes, it goes to show how you can leverage off-the shelf solutions (Connectors, Transformers, Schema Registry etc.) and focus on the heavy lifting that your Azure Cosmos DB based application or data-pipeline requires. Since this example adopts a Docker based approach for local development, it is cost-effective (well, free!) and can be easily customised to suit your requirements.

For production scenarios, you would need to setup, configure and operate these connectors. Kafka Connect workers are simply JVM processes, thus inherently stateless (all the state handling is offloaded to Kafka). There is a lot of flexibility in terms of your overall architecture as well as orchestration – for instance, you could run them in Kubernetes for fault-tolerance and scalability.

2 comments

Discussion is closed. Login to edit/delete existing comments.

  • Alexander Herrmann 0

    Thanks for the article! Unfortunately I am unable to to get it running. I can import the CosmosDB emulator certificate as mentioned, but when I try to create the connector it fails with an SSL handshake exception. I guess either the certificate is not correctly imported or the keystore is wrong. Do you have any advise? Thanks!

    • Abhishek GuptaMicrosoft employee 0

      Hi Alexander – can you introspect the keystore and confirm whether the certificate got imported as expected? Use “keytool -list” command for this

Feedback usabilla icon