Building A Custom Event Hubs Event Processor with .NET

Avatar

Matt

Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This lets you process and analyze the massive amounts of data produced by your connected devices and applications. Once Event Hubs has collected the data, you can retrieve, transform, and store it using any real-time analytics provider, such as Azure Stream Analytics, or with batching/storage adapters.

In an earlier post about choosing the right Event Hubs client in .NET we listed the different clients you can use to interact with Event Hubs. One “client” was EventProcessor<TPartition> which is the low-level machinery that can be used to build a custom event processor. While most customers are well served by the EventProcessorClient, some users may want tighter control over storing processor state, to process messages in batches, or to have additional state they track per partition while processing events. This article will focus on showing how you can use EventProcessor<TPartition> to build your own custom processor with full control over how checkpoint and load balancing data is stored.

What does a processor do?

When consuming events, the consumer is tied to a specific partition of an Event Hub and reads the events in order from that partition. This means, to read all events from an Event Hub, you’ll need one consumer per partition reading and processing events. For some workloads (where the processing can be intensive) you may want these consumers spread out across multiple machines, or even to add and remove consumers as the rate of incoming events increases or decreases. We call this process load balancing. In addition, you’ll want to make sure that if one of your consumers has a problem (such as the machine it is running on crashes) you’re able to resume your processing at a later time, picking up where you left off. We call this process checkpointing.

While you could write code directly using the EventHubConsumerClient or PartitionReceiver to handle these cases, the Event Hubs client library also provides a type, EventProcessorClient, which manages reading form all partitions for you with error handling and recovery built-in. It uses the Azure Blob Storage client library for storing the state used during load balancing and checkpointing operations. For many customers, that’s sufficient, but some customers have asked for the ability to control how the information related to checkpointing and load balancing is persisted and managed. For customers with unique needs, we’ve exposed the lower-level machinery needed to build your own processor, which gives you full control over how this data is stored and more influence over the processor’s operation.

Building an event processor with EventProcessor<TPartition>

The low-level machinery needed to build you own processor is exposed by the EventProcessor<TPartition> class in the Azure.Messaging.EventHubs.Primitives namespace. This is an abstract class, which has five methods you’ll need to provide implementations for. If you’ve used the EventProcessorClient, the first two will be familiar to you:

  • OnProcessingEventBatchAsync: The actual “business logic” of your processor. This is similar to the ProcessEvent event exposed by EventProcessorClient.
  • OnProcessingErrorAsync: A handler that can be used to observe exceptions from inside the machinery of the EventProcessor itself. This is similar to the ProcessError event exposed by EventProcessorClient.

The remaining three deal with checkpointing and load balancing.

  • ListOwnershipAsync: Used by load balancing to see which partitions have been assigned to which processors.
  • ClaimOwnershipAsync: Used by load balancing to grab ownership of a partition (which may or may not be assigned to another processor).
  • ListCheckpointsAsync: Used to find the starting point inside a partition when a processor starts reading events from a partition.

There are two additional methods which are virtual which you may provide implementations for if you choose (the default implementation does nothing):

  • OnInitializingPartition: Called before the processor begins processing information for a partition. Often you will override this if you want to preform some custom initialization before processing messages.
  • OnPartitionProcessingStopped: Called when the processor for a given partition is stopping. Often you will override this if you want to preform some additional steps per partition when processing has stopped.

The On* methods can only be implemented when we know what we want our event processor to do, but the remaining can be implemented independent of the business logic for processing events. To see how to do this, let’s build our own little processor library (as an abstract class which does not implement the On* methods) but does manage storing ownership and checkpoint information. Once we’ve built that library, we’ll show how it can be used to build a complete event processor with the addition of some business logic. While we could choose any durable storage system for storing our ownership and checkpoint information we’ll use Azure Blob Storage to store our state (using the same scheme that that EventProcessorClient uses).

We’ll start by inheriting from EventProcessor<TPartition>:

public abstract class AzureBlobStorageEventProcessor<TPartition> : EventProcessor<TPartition> where TPartition : EventProcessorPartition, new()

You might be wondering exactly what this TPartition generic parameter is doing here. EventProcessor is generic and needs to be instantiated with either EventProcessorPartion or a subclass of it. We call this the partition context, and it is passed to the On* methods. If you have partition-specific state that you’d like to track inside your processor, you can subclass EventProcessorPartition to add additional data. Before the processor starts to read events from a partition, the OnInitializingPartitionAsync is called, and a newly constructed instance of your subclass will be passed in. You can then set whatever partition specific state information you want on that object and the same instance will be passed to the other On* methods for as long as your processor has ownership of the partition. In this case, we don’t know what sort of partition specific information a subclass might want to track, so we leave the option up to whomever subclasses AzureBlobStorageEventProcessor<TPartition>.

Since we’re using Azure Blob Storage to store our state, we’ll need a BlobContainerClient, so we can read and write blobs from a container. So let’s add a property for this, as well as some constructors:

public abstract class AzureBlobStorageEventProcessor<TPartition> : EventProcessor<TPartition> where TPartition : EventProcessorPartition, new()
{
    private BlobContainerClient StorageContainer { get; }

    protected AzureBlobStorageEventProcessor(int eventBatchMaximumCount, string consumerGroup, string connectionString, BlobContainerClient storageContainer, EventProcessorOptions options = null)
        : base(eventBatchMaximumCount, consumerGroup, connectionString, options)
    {
        StorageContainer = storageContainer;
    }

    protected AzureBlobStorageEventProcessor(int eventBatchMaximumCount, string consumerGroup, string connectionString, string eventHubName, BlobContainerClient storageContainer, EventProcessorOptions options = null)
        : base(eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options)
    {
        StorageContainer = storageContainer;
    }

    protected AzureBlobStorageEventProcessor(int eventBatchMaximumCount, string consumerGroup, string fullyQualifiedNamespace, string eventHubName, TokenCredential credential, BlobContainerClient storageContainer, EventProcessorOptions options = null)
        : base(eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options)
    {
        StorageContainer = storageContainer;
    }

    // More to come!
}

With all of that done, we can turn to adding support for load balancing.

Load Balancing

As mentioned earlier, an Event Hub is made up of multiple partitions and each partition is independent. The goal of load balancing is to assign ownership of all of the partitions to one or more processors. When a processor is assigned ownership of a partition, it can connect to that partition and start reading events and deliver them to be processed. The core of the load balancing algorithm is contained inside the EventProcessor<TPartition> base class. To function, we need to implement ListOwnershipAsync so the load balancer can understand what partitions have been assigned to which hosts and ClaimOwnershipAsync so the load balancer can signal that it wants to claim ownership of a specific partition for this processor.

We’ll start by implementing the ListOwnershipAsync method:

protected override async Task<IEnumerable<EventProcessorPartitionOwnership>> ListOwnershipAsync(CancellationToken cancellationToken = default)
{
    // TODO
}

The EventProcessorPartitionOwnership type represents everything needed to understand what instance of a processor owns a specific partition. The class looks like this (with comments removed to save some space):

public class EventProcessorPartitionOwnership
{
    public string FullyQualifiedNamespace { get; set; }
    public string EventHubName { get; set; }
    public string ConsumerGroup { get; set; }
    public string OwnerIdentifier { get; set; }
    public string PartitionId { get; set; }
    public DateTimeOffset LastModifiedTime { get; set; }
    public string Version { get; set; }
}

This is all the information we’ll need to store in Blob Storage. The meaning of some of these properties map to existing Event Hubs concepts, but OwnerIdentitifer, LastModifiedTime and Version are specific to load balancing. Let’s discuss what each of them is used for:

  • OwnerIdentifier: Each instance of EventProcessor<TPartition> needs to have a unique identifier assigned to it. By default, this is a randomly generated GUID, but you can override it via the EventProcessorOptions options bag when you construct an instance.

  • LastModifiedTime: The last time this ownership record was updated. If an ownership record hasn’t been updated in a while (the value can be configured with EventProcessorOptions), then the EventProcessor<TPartiton> will treat the partition as unowned.

  • Version: The load balancing algorithm does not use this directly. Instead, Version is used by the implementation of ClaimOwnershipAsync to ensure that the ownership of a partition has not changed between when the processor called ListOwnershipAsync and when it decided to claim a partition. In cases where you have multiple processes across different hosts, this can be used to ensure both hosts don’t try to claim ownership of a partition at the same time.

To store this information in Blob Storage, we’ll create an empty blob and set some metadata on the blob. The path to the blob will contain the fully qualified namespace, the event hub name, consumer group and partition id. By segmenting things this way, we’ll be able to use the same blob storage container for multiple different processors. The owner identifier will be stored as metadata on the blob itself (we could of course store it in the blob if we’d rather do that) and we’ll use the blob’s ETag as the “version”.

Now that we understand how we are going to encode this information, we can write our implementation of ListOwnershipAsync:

private const string OwnershipPrefixFormat = "{0}/{1}/{2}/ownership/";
private const string OwnerIdentifierMetadataKey = "ownerid";

protected override async Task<IEnumerable<EventProcessorPartitionOwnership>> ListOwnershipAsync(CancellationToken cancellationToken = default)
{
    List<EventProcessorPartitionOwnership> partitonOwnerships = new List<EventProcessorPartitionOwnership>();
    string ownershipBlobsPefix = string.Format(OwnershipPrefixFormat, FullyQualifiedNamespace.ToLowerInvariant(), EventHubName.ToLowerInvariant(), ConsumerGroup.ToLowerInvariant());

    await foreach (BlobItem blob in StorageContainer.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: ownershipBlobsPefix, cancellationToken: cancellationToken).ConfigureAwait(false))
    {
        partitonOwnerships.Add(new EventProcessorPartitionOwnership()
        {
            ConsumerGroup = ConsumerGroup,
            EventHubName = EventHubName,
            FullyQualifiedNamespace = FullyQualifiedNamespace,
            LastModifiedTime = blob.Properties.LastModified.GetValueOrDefault(),
            OwnerIdentifier = blob.Metadata[OwnerIdentifierMetadataKey],
            PartitionId = blob.Name.Substring(ownershipBlobsPefix.Length),
            Version = blob.Properties.ETag.ToString()
        }); ;
    }

    return partitonOwnerships;
}

Note that when we first run our processor, there will be no blobs in our container and so this method will return an empty Enumerable. The load balancer algorithm interprets this as “no partitions are currently owned”.

Implementing ClaimOnwershipAsync is a little trickier, but not much so:

protected override async Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimOwnershipAsync(IEnumerable<EventProcessorPartitionOwnership> desiredOwnership, CancellationToken cancellationToken = default)
{
    // TODO
}        

When ClaimOwnershipAsync is called, we are given a set of partitions we should claim ownership for. Note that some of these partitions may already be owned by this processor. This is done so we can “renew” the ownership of these partitions. We need to return an enumerable of the partitions we were able to claim ownership for.

A small note on the Version field of EventProcessorPartitionOwnership. In the case where a partition has no ownership information associated with it, Version will be null. In our case, this happens when a partition does not have a corresponding blob in Blob Storage. In this case, we need to create the blob. In the other cases, the blob already exists, but we want to make sure we only update it if it hasn’t changed. So in both cases we’ll make a conditional request.

protected override async Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimOwnershipAsync(IEnumerable<EventProcessorPartitionOwnership> desiredOwnership, CancellationToken cancellationToken = default)
{
    List<EventProcessorPartitionOwnership> claimedOwnerships = new List<EventProcessorPartitionOwnership>();

    foreach (EventProcessorPartitionOwnership ownership in desiredOwnership)
    {
        Dictionary<string, string> ownershipMetadata = new Dictionary<string, string>()
        {
            { OwnerIdentifierMetadataKey, ownership.OwnerIdentifier },
        };

        // Construct the path to the blob and get a blob client for it so we can interact with it.
        string ownershipBlob = string.Format(OwnershipPrefixFormat + ownership.PartitionId, ownership.FullyQualifiedNamespace.ToLowerInvariant(), ownership.EventHubName.ToLowerInvariant(), ownership.ConsumerGroup.ToLowerInvariant());
        BlobClient ownershipBlobClient = StorageContainer.GetBlobClient(ownershipBlob);

        try
        {
            if (ownership.Version == null)
            {
                // In this case, we are trying to claim ownership of a partition which was previously unowned, and hence did not have an ownership file. To ensure only a single host grabs the partition, 
                // we use a conditional request so that we only create our blob in the case where it does not yet exist.
                BlobRequestConditions requestConditions = new BlobRequestConditions() { IfNoneMatch = ETag.All };

                using MemoryStream emptyStream = new MemoryStream(Array.Empty<byte>());
                BlobContentInfo info = await ownershipBlobClient.UploadAsync(emptyStream, metadata: ownershipMetadata, conditions: requestConditions, cancellationToken: cancellationToken).ConfigureAwait(false);

                claimedOwnerships.Add(new EventProcessorPartitionOwnership()
                {
                    ConsumerGroup = ownership.ConsumerGroup,
                    EventHubName = ownership.EventHubName,
                    FullyQualifiedNamespace = ownership.FullyQualifiedNamespace,
                    LastModifiedTime = info.LastModified,
                    OwnerIdentifier = ownership.OwnerIdentifier,
                    PartitionId = ownership.PartitionId,
                    Version = info.ETag.ToString()
                });
            }
            else
            {
                // In this case, the partition is owned by some other host. The ownership file already exists, so we just need to change metadata on it. But we should only do this if the metadata has not
                // changed between when we listed ownership and when we are trying to claim ownership, i.e. the ETag for the file has not changed.               
                BlobRequestConditions requestConditions = new BlobRequestConditions() { IfMatch = new ETag(ownership.Version) };
                BlobInfo info = await ownershipBlobClient.SetMetadataAsync(ownershipMetadata, requestConditions, cancellationToken).ConfigureAwait(false);

                claimedOwnerships.Add(new EventProcessorPartitionOwnership()
                {
                    ConsumerGroup = ownership.ConsumerGroup,
                    EventHubName = ownership.EventHubName,
                    FullyQualifiedNamespace = ownership.FullyQualifiedNamespace,
                    LastModifiedTime = info.LastModified,
                    OwnerIdentifier = ownership.OwnerIdentifier,
                    PartitionId = ownership.PartitionId,
                    Version = info.ETag.ToString()
                });
            }
        }
        catch (RequestFailedException e) when (e.ErrorCode == BlobErrorCode.BlobAlreadyExists || e.ErrorCode == BlobErrorCode.ConditionNotMet)
        {
            // In this case, another host has claimed the partition before we did. That's safe to ignore. We'll still try to claim other partitions.
        }
    }

    return claimedOwnerships;
}

Note the try/catch which handles the case where we fail to claim ownership because some other processor beat us to it.

With these two methods implemented, our processor supports load balancing! All that’s left to handle is checkpointing.

Checkpointing

When the processor claims ownership of a partition and wants to start reading events from it, it has to decide where in the partition to begin reading from. To inform this decision, the processor can ask for checkpoint information about a partition in the Event Hub by using GetCheckpointAsync:

protected override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpointsAsync(CancellationToken cancellationToken = default)
{
    // TODO
}

Looking at EventProcessorCheckpoint we’ll see all the information we’ll need to return:

public class EventProcessorCheckpoint
{
    public string FullyQualifiedNamespace { get; set; }
    public string EventHubName { get; set; }
    public string ConsumerGroup { get; set; }
    public string PartitionId { get; set; }
    public EventPosition StartingPosition { get; set; }
}

We can construct an EventPosition either from a time, an offset, or a sequence number. The offset and sequence number are properties on each EventData object that is read from the Event Hubs service. We’ll use the same strategy as ownership for checkpoint information. We’ll use the fully qualified namespace, Event Hub name, consumer group, and partition id to generate a name for a blob and then store checkpoint information as metadata on the blob.

The implementation of ListCheckpointsAsync will look similar to ListOwnershipAsync in structure, we loop over all the blobs with a given prefix and then pull information from them:

private const string CheckpointPrefixFormat = "{0}/{1}/{2}/checkpoint/";
private const string OffsetMetadataKey = "offset";

protected override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpointsAsync(CancellationToken cancellationToken = default)
{
    List<EventProcessorCheckpoint> checkpoints = new List<EventProcessorCheckpoint>();
    string checkpointBlobsPrefix = string.Format(CheckpointPrefixFormat, FullyQualifiedNamespace.ToLowerInvariant(), EventHubName.ToLowerInvariant(), ConsumerGroup.ToLowerInvariant());

    await foreach (BlobItem item in StorageContainer.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: checkpointBlobsPrefix, cancellationToken: cancellationToken).ConfigureAwait(false))
    {
        if (long.TryParse(item.Metadata[OffsetMetadataKey], NumberStyles.Integer, CultureInfo.InvariantCulture, out long offset))
        {
            checkpoints.Add(new EventProcessorCheckpoint()
            {
                ConsumerGroup = ConsumerGroup,
                EventHubName = EventHubName,
                FullyQualifiedNamespace = FullyQualifiedNamespace,
                PartitionId = item.Name.Substring(checkpointBlobsPrefix.Length),
                StartingPosition = EventPosition.FromOffset(offset, isInclusive: false)
            });
        }
    }

    return checkpoints;
}

Note the use of isInclusive when constructing the EventPosititon. By default, EventPosition.FromOffset includes the event with a given offset. However, we want to start from the next event in the stream. If we didn’t do this, we’d end up processing the event we checkpointed again.

In addition to ListCheckpointsAsync there is an optional method, GetCheckpointAsync (new in version 5.3.0 of the Azure.Messaging.EventHubs package) which you can override. GetCheckpointAsync is similar to ListCheckpointsAsync except it only has to fetch information about single checkpoint, so when the processor only needs information about a single checkpoint it can call it instead. Since we only need to fetch information about a single checkpoint, we can provide a more performant implementation (we don’t need to loop over all the blobs in the container and construct multiple EventProcessorCheckpoint objects which will be ignored). Customers have seen substantial performance improvements by implementing GetCheckpointAsync, so we recommend that you do so. The reason the method is virtual instead of abstract is that it was added after the EventProcessor<TPartition> class was released and adding another abstract method would be a breaking change, which we don’t want to do. The default implementation of GetCheckpointAsync simply calls ListCheckpointAsync, then loops through the results to find the checkpoint for the requested partition. By overriding this implementation, we can provide a higher performance implementation that only fetches information for the partition we care about:

private const string CheckpointBlobNameFormat = "{0}/{1}/{2}/checkpoint/{3}";

protected override async Task<EventProcessorCheckpoint> GetCheckpointAsync(string partitionId, CancellationToken cancellationToken)
{
    string checkpointName = string.Format(CheckpointBlobNameFormat, FullyQualifiedNamespace.ToLowerInvariant(), EventHubName.ToLowerInvariant(), ConsumerGroup.ToLowerInvariant(), partitionId);

    try
    {
        BlobProperties properties = await StorageContainer.GetBlobClient(checkpointName).GetPropertiesAsync().ConfigureAwait(false);

        if (long.TryParse(properties.Metadata[OffsetMetadataKey], NumberStyles.Integer, CultureInfo.InvariantCulture, out long offset))
        {
            return new EventProcessorCheckpoint()
            {
                ConsumerGroup = ConsumerGroup,
                EventHubName = EventHubName,
                FullyQualifiedNamespace = FullyQualifiedNamespace,
                PartitionId = partitionId,
                StartingPosition = EventPosition.FromOffset(offset, isInclusive: false)
            };
        }
    }
    catch (RequestFailedException e) when (e.ErrorCode == BlobErrorCode.BlobNotFound)
    {
        // There's no checkpoint for this partition partiton yet, but that's okay, so we ignore this exception.
    }

    return null;
}

If checkpoint information for a partition does not exist, the EventProcessor uses the DefaultStartingPosititon specified in the EventProviderOptions bag, which defaults to the earliest event in the partition.

We’ll also add a helper method that will let subclasses write this checkpoint information, by writing an empty blob with the correct metadata set:

protected async Task CheckpointAsync(TPartition partition, EventData data, CancellationToken cancellationToken = default)
{
    string checkpointBlob = string.Format(CheckpointPrefixFormat + partition.PartitionId, FullyQualifiedNamespace.ToLowerInvariant(), EventHubName.ToLowerInvariant(), ConsumerGroup.ToLowerInvariant());
    Dictionary<string, string> checkpointMetadata = new Dictionary<string, string>()
    {
        { OffsetMetadataKey, data.Offset.ToString(CultureInfo.InvariantCulture) },
    };

    using MemoryStream emptyStream = new MemoryStream(Array.Empty<byte>());
    await StorageContainer.GetBlobClient(checkpointBlob).UploadAsync(emptyStream, metadata: checkpointMetadata, cancellationToken: cancellationToken).ConfigureAwait(false);
}

With this done, we’ve implemented our own processor using a custom state storage. We can now build our own processors on top of this. For example, here’s a simple processor which just prints out the number of events in each batch, and checkpoints as it goes:

class CustomProcessor : AzureBlobStorageEventProcessor<EventProcessorPartition>
{
    public CustomProcessor(int eventBatchMaximumCount, string consumerGroup, string fullyQualifiedNamespace, string eventHubName, TokenCredential credential, BlobContainerClient storageContainer, EventProcessorOptions options = null) : base(eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, storageContainer, options)
    {
    }

    protected async override Task OnProcessingEventBatchAsync(IEnumerable<EventData> events, EventProcessorPartition partition, CancellationToken cancellationToken)
    {_
        try 
        {
            Console.WriteLine($"Received batch of {events.Count()} events for partition {partition.PartitionId}");

            if (events.Any())
            {
                await CheckpointAsync(partition, events.Last(), cancellationToken);
            }
        }
        catch
        {
            // Catch and ignore, we should not allow exceptions to bubble out of this method.
        }
    }

    protected async override Task OnProcessingErrorAsync(Exception exception, EventProcessorPartition partition, string operationDescription, CancellationToken cancellationToken)
    {
        try 
        {
            if (partition != null)
            {
                Console.Error.WriteLine($"Exception on partition {partition.PartitionId} while performing {operationDescription}: {exception.Message}");
            }
            else
            {
                Console.Error.WriteLine($"Exception while performing {operationDescription}: {exception.Message}");
            }        
        }
        catch
        {
            // Catch and ignore, we should not allow exceptions to bubble out of this method.
        }
    }    
}

This should look familiar if you’ve used EventProcessorClient before, but there are a few small differences you need to be aware of:

  • The processor works in batches, so when you construct it you need to specify the maximum number of events you’ll want to process at once, using the eventBatchMaximumCount parameter. The IEnumerable<EventData> that is passed OnProcessingEventBatchAsync to will never contain more items than this maximum count. Note, that it may contain fewer events than this maximum count and so you need to ensure that your processing function is robust in this case.

  • If there are fewer unprocessed messages than eventBatchMaximumCount the processor also contains a maximum amount of time it will wait for more events to show up. By default, this is 60 seconds, but can be controlled by setting the MaximumWaitTime property of the EventProcessorOptions parameter. If this is set to null it means that the processor will until it has received eventBatchMaximumCount events, otherwise it will deliver a batch that has fewer events in it. Setting a maximum wait time allows your processor to be called periodically, even in cases where no events have arrived, which can be useful for some cases. For example, some customers of EventProcessor<TPartition> use this behavior to generate a “heartbeat” which allows them to checkpoint events after a period of time has elapsed instead of when events arrive.

  • If MaximumWaitTime is set, OnProcessingEventBatchAsync will be called even if no events have been consumed. In this case, the IEnumerable<EventData> will contain zero events (so we have to guard our call to .Last() which fails if there is no event.)

  • The partition object passed to OnProcessingErrorAsync may be null, this happens when the exception is not tied to a specific partition (for example, if an exception was thrown by ListOwnershipAsync, it will not be tied to a specific partition and so the partition context will be null when the error handler is called), so we need to guard for that case as well.

In addition to the above, one thing to consider when implementing your own event processor is the strategy you use for deciding when to checkpoint. In the above example, we checkpoint every time we process a batch of events. For some customers, this can be unacceptable because checkpointing may take a fair amount of time relative to your processing logic (e.g. checkpointing requires writing a blob into Blob Storage, which involves a network call). You may choose a different strategy depending on your application. Perhaps you can tolerate reprocessing some number of events and so you only checkpoint after that many events have been processed. Alternately, some customers using EventProcessor<TPartition> will checkpoint on some time frequency (say every five minutes) and when creating the processor they will set MaximumWaitTime to ensure their processing function is called frequently (even when they are no events) so they can create a checkpoint. In these cases, a processor would also add an implementation for OnPartitionProcessingStopped to ensure a checkpoint was written when the processor is shutting down if there are events which have been processed but not yet checkpointed.

Wrapping Up

Compared to EventProcessorClient there’s a lot more code needed to get off the ground and start processing events from an Event Hub. Because of this, we believe EventProcessorClient is the right primitive for most customer scenarios when processing data from Event Hubs. However, for customers that need finer grained control over how state information for the processor is stored, EventProcessor<TPartition> provides the framework necessary to build a processor which has complete control over how its state information is persisted.

More detail on the design and philosophy of EventProcessor<TPartition> can be found in the EventProcessor design document

Azure SDK Blog Contributions

Thank you for reading this Azure SDK blog post! We hope that you learned something new and welcome you to share this post. We are open to Azure SDK blog contributions. Please contact us at azsdkblog@microsoft.com with your topic and we’ll get you set up as a guest blogger.

0 comments

Leave a comment