Azure Schema Registry is a feature of Azure Event Hubs. It provides a central repository for schemas for event-driven and messaging-centric applications, which often work with structured data payload in events or messages. Producers use a schema to serialize payload before publishing it to a broker like Azure Event Hubs; consumers get the payload from the broker and deserialize it using the same schema. For example, a commonly used schema format is Apache Avro. Schema Registry allows producers and consumers to exchange data without having to manage and share the schema. Read more about this feature at Azure Schema Registry in Azure Event Hubs.
Azure Schema Registry client
The Azure SDK team previously released a stable version of the .NET, Java, JavaScript/TypeScript, and Python library for the Schema Registry client. This client allows you to interact with Schema Registry and perform operations like registering and retrieving a schema. Check out the code samples below.
.NET
Registering
string name = "employeeSample";
SchemaFormat format = SchemaFormat.Avro;
// Example schema's definition
string definition = @"
{
""type"" : ""record"",
""namespace"" : ""TestSchema"",
""name"" : ""Employee"",
""fields"" : [
{ ""name"" : ""Name"" , ""type"" : ""string"" },
{ ""name"" : ""Age"", ""type"" : ""int"" }
]
}";
Response<SchemaProperties> schemaProperties = client.RegisterSchema(groupName, name, definition, format);
Retrieving
SchemaRegistrySchema schema = client.GetSchema(schemaId);
string definition = schema.Definition;
Java
Registering
String schemaContent = "{n"
+ " "type" : "record", n"
+ " "namespace" : "SampleSchemaNameSpace", n"
+ " "name" : "Person", n"
+ " "fields" : [n"
+ " { n"
+ " "name" : "FirstName" , "type" : "string" n"
+ " }, n"
+ " { n"
+ " "name" : "LastName", "type" : "string" n"
+ " }n"
+ " ]n"
+ "}";
SchemaProperties schemaProperties = schemaRegistryClient.registerSchema("{schema-group}", "{schema-name}",
schemaContent, SchemaFormat.AVRO);
System.out.println("Registered schema: " + schemaProperties.getId());
Retrieving
String schemaContent = "{n"
+ " "type" : "record", n"
+ " "namespace" : "SampleSchemaNameSpace", n"
+ " "name" : "Person", n"
+ " "fields" : [n"
+ " { n"
+ " "name" : "FirstName" , "type" : "string" n"
+ " }, n"
+ " { n"
+ " "name" : "LastName", "type" : "string" n"
+ " }n"
+ " ]n"
+ "}";
SchemaProperties properties = schemaRegistryClient.getSchemaProperties("{schema-group}", "{schema-name}",
schemaContent, SchemaFormat.AVRO);
System.out.println("Retrieved schema id: " + properties.getId());
JavaScript/TypeScript
Registering
import { DefaultAzureCredential } from "@azure/identity";
import { SchemaRegistryClient } from "@azure/schema-registry";
const client = new SchemaRegistryClient("<fullyQualifiedNamespace>", new DefaultAzureCredential());
const description = {
name: "<name>",
groupName: "<group name>",
format: "<schema format>",
definition: "<schema definition>"
}
const registered = await client.registerSchema(description);
console.log(registered.id);
Retrieving
import { DefaultAzureCredential } from "@azure/identity";
import { SchemaRegistryClient } from "@azure/schema-registry";
const client = new SchemaRegistryClient("<fullyQualifiedNamespace>", new DefaultAzureCredential());
const foundSchema = await client.getSchema("<id>");
if (foundSchema) {
console.log(`Got schema definition=${foundSchema.definition}`);
}
Python
Registering
import os
from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMA_REGISTRY_GROUP']
name = "your-schema-name"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=fully_qualified_namespace, credential=token_credential)
with schema_registry_client:
schema_properties = schema_registry_client.register_schema(group_name, name, definition, format)
id = schema_properties.id
Retrieving
import os
from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
schema_id = 'your-schema-id'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace=fully_qualified_namespace, credential=token_credential)
with schema_registry_client:
schema = schema_registry_client.get_schema(schema_id)
definition = schema.definition
properties = schema.properties
Azure Schema Registry Avro serializer
Recently, a stable version of the Schema Registry Avro serializer library has also been released, available in these four languages: .NET, Java, JavaScript/TypeScript, and Python. Combining the serializer and the client allows you to serialize and deserialize data into the Avro format. See the following code samples. The samples demonstrate how to use the serializer with event data from Azure Event Hubs and with other data types of your choice.
.NET
Serialize
var serializer = new SchemaRegistryAvroSerializer(client, groupName, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
var employee = new Employee { Age = 42, Name = "Caketown" };
EventData eventData = await serializer.SerializeAsync<EventData, Employee>(employee);
// the schema Id will be included as a parameter of the content type
Console.WriteLine(eventData.ContentType);
// the serialized Avro data will be stored in the EventBody
Console.WriteLine(eventData.EventBody);
Deserialize
// construct a consumer and consume the event from our event hub
await using var consumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, fullyQualifiedNamespace, eventHubName, credential);
await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync())
{
Employee deserialized = await serializer.DeserializeAsync<Employee>(eventData);
Console.WriteLine(deserialized.Age);
Console.WriteLine(deserialized.Name);
break;
}
Java
Serialize
PlayingCard playingCard = new PlayingCard();
playingCard.setPlayingCardSuit(PlayingCardSuit.SPADES);
playingCard.setIsFaceCard(false);
playingCard.setCardValue(5);
MessageContent message = serializer.serializeMessageData(playingCard,
TypeReference.createInstance(MessageContent.class));
Deserialize
SchemaRegistryApacheAvroSerializer serializer = createAvroSchemaRegistrySerializer();
MessageContent message = getSchemaRegistryAvroMessage();
PlayingCard playingCard = serializer.deserializeMessageData(message, TypeReference.createInstance(PlayingCard.class));
JavaScript/TypeScript
Serialize
import { DefaultAzureCredential } from "@azure/identity";
import { createEventDataAdapter } from "@azure/event-hubs";
import { SchemaRegistryClient } from "@azure/schema-registry";
import { AvroSerializer } from "@azure/schema-registry-avro";
const client = new SchemaRegistryClient(
"<fully qualified namespace>",
new DefaultAzureCredential()
);
const serializer = new AvroSerializer(client, {
groupName: "<group>",
messageAdapter: createEventDataAdapter(),
});
// Example Avro schema
const schema = JSON.stringify({
type: "record",
name: "Rating",
namespace: "my.example",
fields: [{ name: "score", type: "int" }],
});
// Example value that matches the Avro schema above
const value = { score: 42 };
// Serialize value to a message
const message = await serializer.serialize(value, schema);
Deserialize
// Deserialize a message to value
const deserializedValue = await serializer.deserialize(message);
Python
Serialize
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
Deserialize
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
The serializer also allows you to cache schemas to optimize performance.
More samples
The following samples demonstrate how to use the Avro Serializer to create events with Avro-serialized payload then sending them to Azure Event Hubs. To prevent this blog post from getting too long, links to samples are given here instead of code:
Summary
Azure Schema Registry allows your consumer and producer applications to work with serialized data without having to worry about schemas. The Schema Registry client allows you to register and retrieve schemas. The Schema Registry Avro serializer allows you to serialize and deserialize data to and from the popular Avro format.
0 comments