May 19th, 2022

Announcing the stable release of the Azure Schema Registry client libraries

Deyaaeldeen Almahallawi
Software Engineer

Azure Schema Registry is a feature of Azure Event Hubs. It provides a central repository for schemas for event-driven and messaging-centric applications, which often work with structured data payload in events or messages. Producers use a schema to serialize payload before publishing it to a broker like Azure Event Hubs; consumers get the payload from the broker and deserialize it using the same schema. For example, a commonly used schema format is Apache Avro. Schema Registry allows producers and consumers to exchange data without having to manage and share the schema. Read more about this feature at Azure Schema Registry in Azure Event Hubs.

Azure Schema Registry client

The Azure SDK team previously released a stable version of the .NET, Java, JavaScript/TypeScript, and Python library for the Schema Registry client. This client allows you to interact with Schema Registry and perform operations like registering and retrieving a schema. Check out the code samples below.

.NET

Registering

string name = "employeeSample";
SchemaFormat format = SchemaFormat.Avro;
// Example schema's definition
string definition = @"
{
   ""type"" : ""record"",
    ""namespace"" : ""TestSchema"",
    ""name"" : ""Employee"",
    ""fields"" : [
        { ""name"" : ""Name"" , ""type"" : ""string"" },
        { ""name"" : ""Age"", ""type"" : ""int"" }
    ]
}";

Response<SchemaProperties> schemaProperties = client.RegisterSchema(groupName, name, definition, format);

Retrieving

SchemaRegistrySchema schema = client.GetSchema(schemaId);
string definition = schema.Definition;

Java

Registering

String schemaContent = "{n"
    + "    "type" : "record",  n"
    + "    "namespace" : "SampleSchemaNameSpace", n"
    + "    "name" : "Person", n"
    + "    "fields" : [n"
    + "        { n"
    + "            "name" : "FirstName" , "type" : "string" n"
    + "        }, n"
    + "        { n"
    + "            "name" : "LastName", "type" : "string" n"
    + "        }n"
    + "    ]n"
    + "}";
SchemaProperties schemaProperties = schemaRegistryClient.registerSchema("{schema-group}", "{schema-name}",
    schemaContent, SchemaFormat.AVRO);

System.out.println("Registered schema: " + schemaProperties.getId());

Retrieving

String schemaContent = "{n"
    + "    "type" : "record",  n"
    + "    "namespace" : "SampleSchemaNameSpace", n"
    + "    "name" : "Person", n"
    + "    "fields" : [n"
    + "        { n"
    + "            "name" : "FirstName" , "type" : "string" n"
    + "        }, n"
    + "        { n"
    + "            "name" : "LastName", "type" : "string" n"
    + "        }n"
    + "    ]n"
    + "}";
SchemaProperties properties = schemaRegistryClient.getSchemaProperties("{schema-group}", "{schema-name}",
    schemaContent, SchemaFormat.AVRO);

System.out.println("Retrieved schema id: " + properties.getId());

JavaScript/TypeScript

Registering

import { DefaultAzureCredential } from "@azure/identity";
import { SchemaRegistryClient } from "@azure/schema-registry";

const client = new SchemaRegistryClient("<fullyQualifiedNamespace>", new DefaultAzureCredential());

const description = {
  name: "<name>",
  groupName: "<group name>",
  format: "<schema format>",
  definition: "<schema definition>"
}

const registered = await client.registerSchema(description);
console.log(registered.id);

Retrieving

import { DefaultAzureCredential } from "@azure/identity";
import { SchemaRegistryClient } from "@azure/schema-registry";

const client = new SchemaRegistryClient("<fullyQualifiedNamespace>", new DefaultAzureCredential());
const foundSchema = await client.getSchema("<id>");
if (foundSchema) {
  console.log(`Got schema definition=${foundSchema.definition}`);
}

Python

Registering

import os

from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMA_REGISTRY_GROUP']
name = "your-schema-name"
format = "Avro"
definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}
"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=fully_qualified_namespace, credential=token_credential)
with schema_registry_client:
    schema_properties = schema_registry_client.register_schema(group_name, name, definition, format)
    id = schema_properties.id

Retrieving

import os

from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
schema_id = 'your-schema-id'

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=fully_qualified_namespace, credential=token_credential)
with schema_registry_client:
    schema = schema_registry_client.get_schema(schema_id)
    definition = schema.definition
    properties = schema.properties

Azure Schema Registry Avro serializer

Recently, a stable version of the Schema Registry Avro serializer library has also been released, available in these four languages: .NET, Java, JavaScript/TypeScript, and Python. Combining the serializer and the client allows you to serialize and deserialize data into the Avro format. See the following code samples. The samples demonstrate how to use the serializer with event data from Azure Event Hubs and with other data types of your choice.

.NET

Serialize

var serializer = new SchemaRegistryAvroSerializer(client, groupName, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });

var employee = new Employee { Age = 42, Name = "Caketown" };
EventData eventData = await serializer.SerializeAsync<EventData, Employee>(employee);

// the schema Id will be included as a parameter of the content type
Console.WriteLine(eventData.ContentType);

// the serialized Avro data will be stored in the EventBody
Console.WriteLine(eventData.EventBody);

Deserialize

// construct a consumer and consume the event from our event hub
await using var consumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, fullyQualifiedNamespace, eventHubName, credential);
await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync())
{
    Employee deserialized = await serializer.DeserializeAsync<Employee>(eventData);
    Console.WriteLine(deserialized.Age);
    Console.WriteLine(deserialized.Name);
    break;
}

Java

Serialize

PlayingCard playingCard = new PlayingCard();
playingCard.setPlayingCardSuit(PlayingCardSuit.SPADES);
playingCard.setIsFaceCard(false);
playingCard.setCardValue(5);

MessageContent message = serializer.serializeMessageData(playingCard,
    TypeReference.createInstance(MessageContent.class));

Deserialize

SchemaRegistryApacheAvroSerializer serializer = createAvroSchemaRegistrySerializer();
MessageContent message = getSchemaRegistryAvroMessage();
PlayingCard playingCard = serializer.deserializeMessageData(message, TypeReference.createInstance(PlayingCard.class));

JavaScript/TypeScript

Serialize

import { DefaultAzureCredential } from "@azure/identity";
import { createEventDataAdapter } from "@azure/event-hubs";
import { SchemaRegistryClient } from "@azure/schema-registry";
import { AvroSerializer } from "@azure/schema-registry-avro";

const client = new SchemaRegistryClient(
  "<fully qualified namespace>",
  new DefaultAzureCredential()
);
const serializer = new AvroSerializer(client, {
  groupName: "<group>",
  messageAdapter: createEventDataAdapter(),
});

// Example Avro schema
const schema = JSON.stringify({
  type: "record",
  name: "Rating",
  namespace: "my.example",
  fields: [{ name: "score", type: "int" }],
});

// Example value that matches the Avro schema above
const value = { score: 42 };

// Serialize value to a message
const message = await serializer.serialize(value, schema);

Deserialize

// Deserialize a message to value
const deserializedValue = await serializer.deserialize(message);

Python

Serialize

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

Deserialize

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

The serializer also allows you to cache schemas to optimize performance.

More samples

The following samples demonstrate how to use the Avro Serializer to create events with Avro-serialized payload then sending them to Azure Event Hubs. To prevent this blog post from getting too long, links to samples are given here instead of code:

Summary

Azure Schema Registry allows your consumer and producer applications to work with serialized data without having to worry about schemas. The Schema Registry client allows you to register and retrieve schemas. The Schema Registry Avro serializer allows you to serialize and deserialize data to and from the popular Avro format.

Author

Deyaaeldeen Almahallawi
Software Engineer

Deyaa is a software engineer working on Azure SDKs ranging from Event Hubs to Schema Registry to Text Analytics. He is passionate about library design and developer tools.

0 comments

Discussion are closed.