Having a local development environment is quite handy when trying out a new service or technology. Docker has emerged as the de-facto choice in such cases. It is specially useful in scenarios where you’re trying to integrate multiple services and gives you the ability to to start fresh before each run.
This blog post is a getting started guide for the Kafka Connector for Azure Cosmos DB. All the components (including Azure Cosmos DB) will run on your local machine, thanks to:
- The Azure Cosmos DB Linux Emulator which can be used for local development and testing purposes without creating an Azure subscription or incurring any costs.
- And, Docker Compose which is a tool for defining and running multi-container Docker applications. It will orchestrate all the components required by our setup including Azure Cosmos DB emulator, Kafka, Zookeeper, Kafka connectors etc.
To make things easier, we will pick single-focused scenarios and go step by step:
- Step 0 – A simple scenario to check if our setup if functional.
- How to handle streaming JSON data
- How to handle streaming JSON data which is not compatible with Azure Cosmos DB
- How to handle Avro data using Schema Registry
First things first…
… here is a quick overview of the the Azure Cosmos DB Emulator and the Kafka Connector.
The Azure Cosmos DB connector allows you to move data between Azure Cosmos DB and Kafka. It’s available as a source as well as a sink. The Azure Cosmos DB Sink connector writes data from a Kafka topic to an Azure Cosmos DB container and the Source connector writes changes from an Azure Cosmos DB container to a Kafka topic. At the time of writing, the connector is in pre-production
 mode. You can read more about it on the GitHub repo or install/download it from the Confluent Hub.
The Azure Cosmos DB Linux Emulator provides a local environment that emulates the Azure Cosmos DB service for development purposes (currently, it only supports SQL API). It provides a high-fidelity emulation of the Azure Cosmos DB service and supports functionality such as creating data, querying data, provisioning and scaling containers, and executing stored procedures and triggers.
You can dive into how to use the emulator on macOS or Linux, how is it different from the Azure Cosmos DB cloud service, troubleshoot issues, etc.
Before you start…
Make sure you have Docker and docker-compose installed.
Also, clone the project from GitHub:
git clone https://github.com/Azure-Samples/cosmosdb-kafka-connect-docker cd cosmosdb-kafka-connect-docker
Start all the services
All the components are defined in the docker-compose file:
- Azure Cosmos DB emulator
- Kafka and Zookeeper
- Azure Cosmos DB and Datagen connectors (run as separate Kafka Connect workers)
- Confluent Schema Registry
Thanks to Docker Compose, the environment can be brought up with a single command. When you run this for this the first time, it may take a while for the containers to be downloaded (subsequent executions are faster). Optionally, you can also download the images separately before starting Docker Compose:
(optional) docker pull confluentinc/cp-zookeeper:latest docker pull confluentinc/cp-kafka:latest docker pull confluentinc/cp-schema-registry:latest
To start all the services:
docker-compose -p cosmosdb-kafka-docker up --build
After a couple of minutes, check the containers:
docker-compose -p cosmosdb-kafka-docker ps # expected output Name Command State Ports ------------------------------------------------------------------------------------------------------------------------ cosmosdb /usr/local/bin/cosmos/start.sh Up 0.0.0.0:10251->10251/tcp,:::10251- >10251/tcp, 0.0.0.0:10252->10252/t cp,:::10252->10252/tcp, 0.0.0.0:10 253->10253/tcp,:::10253->10253/tcp , 0.0.0.0:10254->10254/tcp,:::1025 4->10254/tcp, 0.0.0.0:8081->8081/t cp,:::8081->8081/tcp cosmosdb-kafka-docker_cosmosdb- /etc/confluent/docker/run Up (healthy) 0.0.0.0:8083->8083/tcp,:::8083->80 connector_1 83/tcp, 9092/tcp cosmosdb-kafka-docker_datagen- /etc/confluent/docker/run Up (healthy) 0.0.0.0:8080->8080/tcp,:::8080->80 connector_1 80/tcp, 8083/tcp, 9092/tcp kafka /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp,:::29092- >29092/tcp, 0.0.0.0:9092->9092/tcp ,:::9092->9092/tcp schema-registry /etc/confluent/docker/run Up 0.0.0.0:9090->8081/tcp,:::9090->80 81/tcp zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp,:::2181->21 81/tcp, 2888/tcp, 3888/tcp
Once all the services as up and running, the next logical step is to install the connector, right? Well, there a couple of things we need to take care of. For Java apps to connect to the Azure Cosmos DB emulator, you need to have certificates installed in the Java certificate store. In this case, we will seed the certificate from the Azure Cosmos DB emulator container to the Cosmos DB Kafka Connect container.
Configure Azure Cosmos DB Emulator Certificates
Execute this command to store the certificate in the Java certificate store (using docker exec
):
docker exec --user root -it cosmosdb-kafka-docker_cosmosdb-connector_1 /bin/bash # execute the below command inside the container curl -k https://cosmosdb:8081/_explorer/emulator.pem > ~/emulatorcert.crt && keytool -noprompt -storepass changeit -keypass changeit -keystore /usr/lib/jvm/zulu11-ca/lib/security/cacerts -importcert -alias emulator_cert -file ~/emulatorcert.crt
And, one last thing before we proceed…
Create Azure Cosmos DB database and containers
Access the Azure Cosmos DB emulator portal at https://localhost:8081/_explorer/index.html and create the below resources:
- Database namedÂ
testdb
- Containers –Â
inventory
,Âorders
,Âorders_avro
 (ensure that the partition key for all the containers isÂ/id
)
Let’s explore all the scenarios
To start off let’s look at the basic scenario. Before trying out other things, we want to make sure everything is functional.
1. Hello world!
Start the inventory data connector for Cosmos DB:
curl -X POST -H "Content-Type: application/json" -d @cosmosdb-inventory-connector_1.json http://localhost:8083/connectors # to check the connector status curl http://localhost:8083/connectors/inventory-sink/status
To test the end to end flow, send a few records to the inventory_topic
topic in Kafka:
docker exec -it kafka bash -c 'cd /usr/bin && kafka-console-producer --topic inventory_topic --bootstrap-server kafka:29092'
Once the prompt is ready, send the JSON records one by one:
{"id": "5000","quantity": 100,"productid": 42} {"id": "5001","quantity": 99,"productid": 43} {"id": "5002","quantity": 98,"productid": 44}
Check Cosmos DB container to confirm if records have been saved. Navigate to the the portal https://localhost:8081/_explorer/index.html and check the inventory
 container:
Ok, it worked! Let’s go ahead and do something slightly more useful. Before moving on, delete the inventory
 connector.
curl -X DELETE http://localhost:8083/connectors/inventory-sink/
2. Sync streaming data (JSON format) from Kafka to Azure Cosmos DB
For the remaining scenarios, we will use a producer component to generate records. The Kafka Connect Datagen connector is our friend. It is meant for generating mock data for testing, so let’s put it to good use!
Start an instance of the Azure Cosmos DB connector:
curl -X POST -H "Content-Type: application/json" -d @cosmosdb-inventory-connector_2.json http://localhost:8083/connectors # to check the connector status curl http://localhost:8083/connectors/inventory-sink/status
Once it’s ready, go ahead and start the Datagen connector which will generate mock inventory data in JSON format:
curl -X POST -H "Content-Type: application/json" -d @datagen-inventory-connector.json http://localhost:8080/connectors # to check the connector status curl http://localhost:8080/connectors/datagen-inventory/status
To look at the data produced by the Datagen connector, check the inventory_topic1
 Kafka topic:
docker exec -it kafka bash -c 'cd /usr/bin && kafka-console-consumer --topic inventory_topic1 --bootstrap-server kafka:29092'
Notice the data (it maybe different in your case):
{"id":5,"quantity":5,"productid":5} {"id":6,"quantity":6,"productid":6} {"id":7,"quantity":7,"productid":7} ...
Check Azure Cosmos DB containers to confirm if records have been saved. Navigate to the the portal https://localhost:8081/_explorer/index.html and check the inventory
 container:
The records in Cosmos DB have an id
 attribute of String
 data type. The original data in the Kafka topic had an id
 attribute of Integer
 type – but that would not have worked, since Azure Cosmos DB requires id
 to be a unique user-defined string. This conversion was made possible by a Kafka Connect transform – Cast
 updates fields (or the entire key or value) to a specific type, updating the schema if one is present.
Here is the part in the connector configuration which did the trick:
"transforms": "Cast", "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.Cast.spec": "id:string"
Before moving on, delete the Cosmos DB and Datagen inventory
 connectors.
curl -X DELETE http://localhost:8080/connectors/datagen-inventory curl -X DELETE http://localhost:8083/connectors/inventory-sink/
3. Push streaming Orders data (JSON format) from Kafka to Azure Cosmos DB
Now, let’s switch gears and use the same data (JSON formatted) data, but with a slight twist. We will use a variant of the Datagen connector to generate mock orders data and adjust the Cosmos DB connector as well.
To install a different instance of the Azure Cosmos DB connector:
curl -X POST -H "Content-Type: application/json" -d @cosmosdb-orders-connector_1.json http://localhost:8083/connectors # to check the connector status curl http://localhost:8083/connectors/orders-sink/status
Install the Datagen orders connector:
curl -X POST -H "Content-Type: application/json" -d @datagen-orders-connector.json http://localhost:8080/connectors # to check the connector status curl http://localhost:8080/connectors/datagen-orders/status
To look at the data produced by the Datagen connector, check the orders
 Kafka topic:
docker exec -it kafka bash -c 'cd /usr/bin && kafka-console-consumer --topic orders_topic --bootstrap-server kafka:29092'
Notice the data (it maybe different in your case):
{"ordertime":1496251410176,"orderid":3,"itemid":"Item_869","orderunits":3.2897805449886226,"address":{"city":"City_99","state":"State_46","zipcode":50570}} {"ordertime":1500129505219,"orderid":4,"itemid":"Item_339","orderunits":3.6719921257659918,"address":{"city":"City_84","state":"State_55","zipcode":88573}} {"ordertime":1498873571020,"orderid":5,"itemid":"Item_922","orderunits":8.4506812669258,"address":{"city":"City_48","state":"State_66","zipcode":55218}} {"ordertime":1513855504436,"orderid":6,"itemid":"Item_545","orderunits":7.82561522361042,"address":{"city":"City_44","state":"State_71","zipcode":87868}} ...
I chose Orders data on purpose since it is different compared to the Inventory data. Notice that JSON record produced by the Datagen connector has a orderid
 attribute (Integer data type), but no id
attribute – but we know that Azure Cosmos DB won’t work without one.
Check Cosmos DB containers to confirm if records have been saved. Navigate to the the portal https://localhost:8081/_explorer/index.html and check the orders
 container:
Notice that there is no orderid
 attribute in the records stored in Azure Cosmos DB. In fact, it has been replaced by the id
 attribute (with a String
value). This achieved by the ReplaceField transformer.
Here is the part in the connector configuration which made this possible:
"transforms": "RenameField,Cast", "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.RenameField.renames": "orderid:id", "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.Cast.spec": "id:string"
Before moving on, delete the Cosmos DB and Datagen inventory
 connectors.
curl -X DELETE http://localhost:8080/connectors/datagen-orders curl -X DELETE http://localhost:8083/connectors/orders-sink/
4. Push streaming Orders data (AVRO format) from Kafka to Azure Cosmos DB
So far we dealt with JSON, a commonly used data format. But, Avro is heavily used in production due to its compact format which leads to better performance and cost savings. To make it easier to deal with Avro
 data schema, there is Confluent Schema Registry which provides a serving layer for your metadata along with a RESTful interface for storing and retrieving your Avro (as well as JSON and Protobuf schemas). We will use the Docker version for the purposes of this blog post.
Install a new instance of the Azure Cosmos DB connector that can handle Avro
 data:
curl -X POST -H "Content-Type: application/json" -d @cosmosdb-orders-connector_2.json http://localhost:8083/connectors # to check the connector status curl http://localhost:8083/connectors/orders-sink/status
Install Datagen connector which will generate mock orders data in Avro
 format:
curl -X POST -H "Content-Type: application/json" -d @datagen-orders-connector-avro.json http://localhost:8080/connectors # to check the connector status curl http://localhost:8080/connectors/datagen-orders/status
To look at the Avro
 data produced by the Datagen connector, check the orders_avro_topic
 Kafka topic:
docker exec -it kafka bash -c 'cd /usr/bin && kafka-console-consumer --topic orders_avro_topic --bootstrap-server kafka:29092'
Since the Avro
data in binary format, it’s not human readable:
�����VItem_185lqf�@City_61State_73�� ����WItem_219[�C��@City_74State_77�� �����VItem_7167Ix�dF�?City_53State_53�� ���֩WItem_126*���?@City_58State_21�� �����VItem_329X�2,@City_49State_79�� �����XItem_886��>�|�@City_88State_27�� ��V Item_956�r#�!@City_45State_96�� �ѼҕW"Item_157E�)$���?City_96State_63�� ...
Check Cosmos DB containers to confirm if records have been saved. Navigate to the the portal https://localhost:8081/_explorer/index.html and check the orders_avro
 container:
Great, things work as expected! The connector configuration was updated to handle this:
"value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schemas.enable": "true", "value.converter.schema.registry.url": "http://schema-registry:8081", ...
The changes include choosing the AvroConverter
, enabling schemas and pointing to Schema Registry (in our case running locally in Docker).
That is all for the use cases covered in this blog post. We only covered the Sink connector, but feel free to explore and experiment further! For example, you could extend the current setup to include the Source connector and configure it to send records from Azure Cosmos DB containers to Kafka.
Clean up
Once you have finished, you can delete the connectors:
curl -X DELETE http://localhost:8080/connectors/datagen-orders curl -X DELETE http://localhost:8083/connectors/orders-sink/
To stop all the Docker components:
docker-compose -p cosmosdb-kafka-docker down -v
Conclusion
Although we covered simple scenarios for demonstration purposes, it goes to show how you can leverage off-the shelf solutions (Connectors, Transformers, Schema Registry etc.) and focus on the heavy lifting that your Azure Cosmos DB based application or data-pipeline requires. Since this example adopts a Docker based approach for local development, it is cost-effective (well, free!) and can be easily customised to suit your requirements.
For production scenarios, you would need to setup, configure and operate these connectors. Kafka Connect workers are simply JVM processes, thus inherently stateless (all the state handling is offloaded to Kafka). There is a lot of flexibility in terms of your overall architecture as well as orchestration – for instance, you could run them in Kubernetes for fault-tolerance and scalability.
Thanks for the article! Unfortunately I am unable to to get it running. I can import the CosmosDB emulator certificate as mentioned, but when I try to create the connector it fails with an SSL handshake exception. I guess either the certificate is not correctly imported or the keystore is wrong. Do you have any advise? Thanks!
Hi Alexander – can you introspect the keystore and confirm whether the certificate got imported as expected? Use “keytool -list” command for this