The Azure SDK for Go team at Microsoft is excited to announce the stable release of the Azure Event Hubs client library for Go. Azure Event Hubs is a fully managed, real-time data ingestion service that’s simple, trusted, and scalable.
NOTE: If you’re using the legacy Azure Event Hubs library for Go and would like to upgrade, see the migration guide.
Install the package
The Azure Event Hubs client library is named azeventhubs
. To install the latest version of azeventhubs
, use the go get
command. You can use the Azure Identity library to authenticate the client application.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
# Optionally, if you also want to use Azure Identity for authentication
go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
We assume that you have:
- An Azure subscription with an Azure Event Hubs namespace.
- A working development environment for Go version 1.18 or above.
For instructions on creating an Azure Event Hubs namespace, follow this step-by-step guide.
Create a client
azeventhubs
has two clients:
- The
ProducerClient
is used to send events to partitions, utilizing batches for efficiency. - The
ConsumerClient
is used to consume events from partitions. It’s also used by theProcessor
to receive events using checkpointing.
Both clients are concurrency-safe.
You can create either client using a TokenCredential
type, such as DefaultAzureCredential
from the Azure Identity library, or using an Azure Event Hubs connection string.
NOTE: We recommend you use the
TokenCredential
type for authentication. Connection string authentication isn’t recommended. For more information about going passwordless, see Passwordless connections for Azure services.
Use the DefaultAzureCredential
token credential (Recommended)
The DefaultAzureCredential
combines several credential types into one easy-to-use type. It can authenticate using the Azure CLI, managed identities, and more. For more information about available credential types, see the azidentity
documentation.
See the example for NewConsumerClient or NewProducerClient.
Use Azure Event Hubs connection string
Azure Event Hubs also supports authentication using a connection string, which you can get from the Azure portal. For more information about getting a connection string, see Get an Event Hubs connection string.
See the example for NewConsumerClientFromConnectionString or NewProducerClientFromConnectionString.
Tools for sending, receiving and diagnosing Event Hubs
We’re going to build a few tools to demonstrate the features of the Event Hubs library, which you can use to monitor and test your own applications.
The tools:
ehproducer
: uses theProducerClient
to send events.ehprocessor
: uses theProcessor
to consume multiple partitions, with checkpoints,ehpartition
: uses thePartitionClient
to read from a single partition.ehlagcalc
: calculate lag between yourProcessor
instances and the available events in your Event Hubs.
To build these tools, complete the following steps:
- Create an empty folder.
- Create a new file in the folder, and paste the utility’s code into a main.go file (ex: main.go).
- In a terminal, run the following commands:
go mod init utility go mod tidy go build
ehproducer: Producing events
This utility sends events using the ProducerClient
. It takes events from stdin, places them into an EventDataBatch
, and then sends the batch to Event Hubs. You can also choose your own partition key/partition ID or send it to Event Hubs and let it distribute the events automatically.
For more information about how partitions work, see Partitioning on the Event Hubs Features page.
// This tool shows how to send events to an Event Hub, targeting a partition ID/partition key or allowing
// Event Hubs to choose the destination partition.
//
// For more information about partitioning see: https://learn.microsoft.com/azure/event-hubs/event-hubs-features#partitions
package main
import (
"bufio"
"context"
"errors"
"flag"
"fmt"
"io"
"log"
"os"
azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
if err := produceEventsTool(); err != nil {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", err)
os.Exit(1)
}
}
func printProduceEventsExamples() {
fmt.Fprintf(os.Stderr, "Examples:\n"+
" # Send a single event to partition with ID \"partitionid\" from STDIN\n"+
" echo hello | ehproducer -namespace your-event-hub-namespace.servicebus.windows.net -eventhub tests -partition \"partitionid\"\n"+
"\n"+
" # Send a single event to partition with ID \"partitionid\" from a file\n"+
" ehproducer -namespace your-event-hub-namespace.servicebus.windows.net -eventhub tests -partition \"partitionid\" < samplemessage.txt\n"+
"\n"+
" # Send multiple events to partition with ID \"partitionid\" from a file\n"+
" ehproducer -namespace your-event-hub-namespace.servicebus.windows.net -eventhub testing -partition \"partitionid\" < file_with_one_message_per_line.txt\n",
)
}
func produceEventsTool() error {
fs := flag.NewFlagSet("ehproducer", flag.ContinueOnError)
eventHubNamespace := fs.String("namespace", "", "The fully qualified hostname of your Event Hub namespace (ex: <your event hub>.servicebus.windows.net)")
eventHubName := fs.String("eventhub", "", "The name of your Event Hub")
partitionKey := fs.String("partitionkey", "", "Partition key for events we send.")
partitionID := fs.String("partition", "", "Partition ID to send events to. By default, allows Event Hubs to assign a partition")
readMultiple := fs.Bool("multiple", false, "Whether each line of STDIN should be treated as a separate event, or if all the lines should be joined and sent as a single event")
verbose := fs.Bool("v", false, "Enable Azure SDK verbose logging")
if err := fs.Parse(os.Args[1:]); err != nil {
printProduceEventsExamples()
return err
}
if *eventHubNamespace == "" || *eventHubName == "" && (*partitionKey == "" || *partitionID == "") {
fs.PrintDefaults()
printProduceEventsExamples()
return errors.New("Missing command line arguments")
}
if *verbose {
azlog.SetEvents(azeventhubs.EventConsumer, azeventhubs.EventConn, azeventhubs.EventAuth, azeventhubs.EventProducer)
azlog.SetListener(func(e azlog.Event, s string) {
log.Printf("[%s] %s", e, s)
})
}
defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return err
}
producerClient, err := azeventhubs.NewProducerClient(*eventHubNamespace, *eventHubName, defaultAzureCred, nil)
if err != nil {
return err
}
defer producerClient.Close(context.Background())
batchOptions := &azeventhubs.EventDataBatchOptions{}
if *partitionKey != "" {
batchOptions.PartitionKey = partitionKey
}
if *partitionID != "" {
batchOptions.PartitionID = partitionID
}
batch, err := producerClient.NewEventDataBatch(context.Background(), batchOptions)
if err != nil {
return err
}
if err := readEventsFromStdin(*readMultiple, batch); err != nil {
return err
}
if err := producerClient.SendEventDataBatch(context.Background(), batch, nil); err != nil {
return err
}
fmt.Fprintf(os.Stderr, "Sent %d events, %d bytes\n", batch.NumEvents(), batch.NumBytes())
return nil
}
func readEventsFromStdin(readMultiple bool, batch *azeventhubs.EventDataBatch) error {
if readMultiple {
scanner := bufio.NewScanner(os.Stdin)
// This is a simplified approach and will fail if the size of the messages exceeds
// the maximum allowed size. For an example of how to handle this, see this example:
// https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#example-package-ProducingEventsUsingProducerClient
for scanner.Scan() {
if err := batch.AddEventData(&azeventhubs.EventData{
Body: scanner.Bytes(),
}, nil); err != nil {
return err
}
}
return scanner.Err()
} else {
bytes, err := io.ReadAll(os.Stdin)
if err != nil {
return err
}
return batch.AddEventData(&azeventhubs.EventData{
Body: bytes,
}, nil)
}
}
ehprocessor: Consuming events with checkpointing using the Processor
In the previous section, “Tool: Producing events”, we produced events, sending them to partitions in our event hub.
This tool uses the Processor
, a higher-level type that makes it easy to build an application that horizontally scales. It also provides a built-in mechanism for storing your progress using checkpoints.
// This tool lets you consume events in two ways using the Processor. The Processor
// tracks progress and can balance load between itself and other Processors,
// storing checkpoint information to Azure Storage Blobs.
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"os"
"time"
azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
)
func printProcessorExamples() {
fmt.Fprintf(os.Stderr, "\n"+
"Examples for processor:\n"+
" # Consume from multiple partitions, using the Processor and checkpointing\n"+
" ehprocessor -namespace <your event hub namespace>.servicebus.windows.net -eventhub tests -storageaccount https://<your storage account>.blob.core.windows.net -container <your storage container>\n"+
"\n")
}
func processorCmd() error {
eventHubNamespace := flag.String("namespace", "", "The fully qualified hostname of your Event Hub namespace (ex: <your event hub>.servicebus.windows.net)")
eventHubName := flag.String("eventhub", "", "The name of your Event Hub")
eventHubConsumerGroup := flag.String("consumergroup", azeventhubs.DefaultConsumerGroup, "The Event Hub consumer group used by your application")
maxBatchWaitTime := flag.Duration("wait", 30*time.Second, "Max wait time for events, per batch")
maxBatchSize := flag.Int("count", 1, "Maximum number of events to receive, per batch")
storageAccountURL := flag.String("storageaccount", "", "The storage account URL used by your blob store (ex: https://<storage account name>.blob.core.windows.net)")
storageContainerName := flag.String("container", "", "The storage container used by your checkpoints")
verbose := flag.Bool("v", false, "Enable Azure SDK verbose logging")
flag.Parse()
if *eventHubName == "" || *eventHubNamespace == "" || *eventHubConsumerGroup == "" || *storageAccountURL == "" || *storageContainerName == "" {
flag.PrintDefaults()
printProcessorExamples()
return errors.New("missing command line arguments")
}
if *verbose {
azlog.SetEvents(azeventhubs.EventConsumer, azeventhubs.EventConn, azeventhubs.EventAuth, azeventhubs.EventProducer)
azlog.SetListener(func(e azlog.Event, s string) {
log.Printf("[%s] %s", e, s)
})
}
defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return err
}
checkpointBlobStore, err := createCheckpointStore(storageAccountURL, defaultAzureCred, storageContainerName)
if err != nil {
return err
}
consumerClient, err := azeventhubs.NewConsumerClient(*eventHubNamespace, *eventHubName, *eventHubConsumerGroup, defaultAzureCred, nil)
if err != nil {
return err
}
defer consumerClient.Close(context.Background())
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointBlobStore, &azeventhubs.ProcessorOptions{
LoadBalancingStrategy: azeventhubs.ProcessorStrategyGreedy,
})
if err != nil {
return err
}
appCtx, appCancel := context.WithCancel(context.Background())
defer appCancel()
dispatchPartitionClients := func() {
log.Printf("PartitionClient dispatcher has started...")
defer log.Printf("PartitionClient dispatcher has stopped.")
for {
pc := processor.NextPartitionClient(appCtx)
if pc == nil {
log.Println("Processor has stopped, stopping partition client dispatch loop")
break
}
log.Printf("Acquired partition %s, receiving", pc.PartitionID())
go processPartition(appCtx, pc, *maxBatchWaitTime, *maxBatchSize)
}
}
go dispatchPartitionClients()
log.Printf("Starting processor.")
if err := processor.Run(appCtx); err != nil {
return err
}
return nil
}
func createCheckpointStore(storageAccountURL *string, defaultAzureCred *azidentity.DefaultAzureCredential, storageContainerName *string) (azeventhubs.CheckpointStore, error) {
blobClient, err := azblob.NewClient(*storageAccountURL, defaultAzureCred, nil)
if err != nil {
return nil, err
}
containerClient := blobClient.ServiceClient().NewContainerClient(*storageContainerName)
log.Printf("Creating storage container %q, if it doesn't already exist", *storageContainerName)
if _, err := containerClient.Create(context.Background(), nil); err != nil {
if !bloberror.HasCode(err, bloberror.ContainerAlreadyExists) {
return nil, err
}
}
return checkpoints.NewBlobStore(containerClient, nil)
}
func processPartition(ctx context.Context, pc *azeventhubs.ProcessorPartitionClient, eventHubMaxTime time.Duration, eventHubMaxSize int) {
defer pc.Close(ctx)
for {
receiveCtx, cancelReceive := context.WithTimeout(ctx, eventHubMaxTime)
events, err := pc.ReceiveEvents(receiveCtx, eventHubMaxSize, nil)
cancelReceive()
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
if ctx.Err() != nil { // parent cancelled
break
}
// timing out without any events is fine. Continue receiving...
continue
} else if err != nil {
log.Printf("ERROR while processing partition %q: %s", pc.PartitionID(), err)
break
}
if len(events) > 0 {
if err := printEventsAsJSON(pc.PartitionID(), events); err != nil {
log.Printf("ERROR: failed when printing events: %s", err)
break
}
latestEvent := events[len(events)-1]
log.Printf("[%s] Updating checkpoint with offset: %d, sequenceNumber: %d", pc.PartitionID(), latestEvent.SequenceNumber, latestEvent.Offset)
if err := pc.UpdateCheckpoint(ctx, latestEvent, nil); err != nil {
log.Printf("ERROR: failed when updating checkpoint: %s", err)
}
}
}
}
func printEventsAsJSON(partitionID string, events []*azeventhubs.ReceivedEventData) error {
for _, evt := range events {
var bodyBytes []int
for _, b := range evt.Body {
bodyBytes = append(bodyBytes, int(b))
}
// pick out some of the common fields
jsonBytes, err := json.Marshal(struct {
PartitionID string
MessageID any
BodyAsString string
Body []int
SequenceNumber int64
Offset int64
}{partitionID, evt.MessageID, string(evt.Body), bodyBytes, evt.SequenceNumber, evt.Offset})
if err != nil {
return fmt.Errorf("Failed to marshal received event with message ID %v: %s", evt.MessageID, err.Error())
}
fmt.Printf("%s\n", string(jsonBytes))
}
return nil
}
func main() {
if err := processorCmd(); err != nil {
fmt.Printf("ERROR: %s\n", err)
os.Exit(1)
}
}
ehlagcalc: Calculating consumption lag using the CheckpointStore
Consumers can sometimes fall behind when receiving events. We can get a rough estimate of how far behind we are by comparing the last checkpoint for a partition against the last enqueued sequence number.
// This tool queries metadata from Event Hubs and checks it against information stored in the checkpoint
// store to calculate the "lag" between our Processors and the service. It's best used as a rough approximation
// of state as the data sources are not necessarily in-sync when updates occur frequently.
package main
import (
"context"
"errors"
"flag"
"fmt"
"os"
"sort"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)
func main() {
if err := checkpointLagTool(os.Args[1:]); err != nil {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", err)
os.Exit(1)
}
}
func checkpointLagTool(commandLineArgs []string) error {
clients, err := createClients()
if err != nil {
return err
}
defer clients.ConsumerClient.Close(context.Background())
eventHubProps, err := clients.ConsumerClient.GetEventHubProperties(context.Background(), nil)
if err != nil {
return err
}
checkpoints, err := clients.CheckpointStore.ListCheckpoints(context.Background(), clients.EventHubNamespace, clients.EventHubName, clients.EventHubConsumerGroup, nil)
if err != nil {
return err
}
checkpointsMap := map[string]*azeventhubs.Checkpoint{}
for _, cp := range checkpoints {
cp := cp
checkpointsMap[cp.PartitionID] = &cp
}
ownerships, err := clients.CheckpointStore.ListOwnership(context.Background(), clients.EventHubNamespace, clients.EventHubName, clients.EventHubConsumerGroup, nil)
if err != nil {
return err
}
ownersMap := map[string]*azeventhubs.Ownership{}
for _, o := range ownerships {
o := o
ownersMap[o.PartitionID] = &o
}
sort.Strings(eventHubProps.PartitionIDs)
fmt.Fprintf(os.Stderr, "WARNING: Excessive querying of the checkpoint store/Event Hubs can impact application performance.\n")
for _, partID := range eventHubProps.PartitionIDs {
partID := partID
cp, o := checkpointsMap[partID], ownersMap[partID]
partProps, err := clients.ConsumerClient.GetPartitionProperties(context.Background(), partID, nil)
if err != nil {
return err
}
fmt.Printf("Partition ID %q\n", partID)
if o != nil {
fmt.Printf(" Owner ID: %q, last updated: %s\n", o.OwnerID, o.LastModifiedTime.Format(time.RFC3339))
} else {
fmt.Printf(" Owner ID: <no owner>\n")
}
fmt.Printf(" Last enqueued sequence number is %d\n", partProps.LastEnqueuedSequenceNumber)
if cp != nil && cp.SequenceNumber != nil {
fmt.Printf(" Delta (between service and checkpoint): %d\n", partProps.LastEnqueuedSequenceNumber-*cp.SequenceNumber)
}
}
return nil
}
type clients struct {
ConsumerClient *azeventhubs.ConsumerClient
CheckpointStore *checkpoints.BlobStore
EventHubNamespace string
EventHubName string
EventHubConsumerGroup string
}
func createClients() (*clients, error) {
eventHubNamespace := flag.String("namespace", "", "The fully qualified hostname of your Event Hub namespace (ex: <your event hub>.servicebus.windows.net)")
eventHubName := flag.String("eventhub", "", "The name of your Event Hub")
eventHubConsumerGroup := flag.String("consumergroup", azeventhubs.DefaultConsumerGroup, "The Event Hub consumer group used by your application")
storageAccountURL := flag.String("storageaccount", "", "The storage account URL used by your blob store (ex: https://<storage account name>.blob.core.windows.net/)")
storageContainerName := flag.String("container", "", "The storage container used by your checkpoints")
flag.Parse()
if *eventHubNamespace == "" || *eventHubName == "" || *eventHubConsumerGroup == "" || *storageAccountURL == "" || *storageContainerName == "" {
flag.PrintDefaults()
return nil, errors.New("Missing command line arguments")
}
defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, err
}
blobClient, err := azblob.NewClient(*storageAccountURL, defaultAzureCred, nil)
if err != nil {
return nil, err
}
blobStore, err := checkpoints.NewBlobStore(blobClient.ServiceClient().NewContainerClient(*storageContainerName), nil)
if err != nil {
return nil, err
}
// Both ProducerClient and ConsumerClient can query event hub partition properties.
cc, err := azeventhubs.NewConsumerClient(*eventHubNamespace, *eventHubName, *eventHubConsumerGroup, defaultAzureCred, nil)
if err != nil {
return nil, err
}
return &clients{
ConsumerClient: cc,
CheckpointStore: blobStore,
EventHubNamespace: *eventHubNamespace,
EventHubName: *eventHubName,
EventHubConsumerGroup: *eventHubConsumerGroup,
}, nil
}
ehpartition: Consuming events from a single partition, no checkpointing
The ConsumerClient
can be used to receive events from specific partitions. The ConsumerClient
doesn’t do load balancing or checkpointing and is perfect for when you want to have absolute control over which partitions are read and where you want to start reading.
// This tool lets you consume events from a single partition using the ProducerClient.
// The PartitionClient does not do checkpointing and can only consume from a single
// partition at a time. Look at the "ehprocessor" tool, which uses the Processor.
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
if err := partitionCmd(os.Args[:]); err != nil {
fmt.Printf("ERROR: %s\n", err)
os.Exit(1)
}
}
func printPartitionExamples() {
fmt.Fprintf(os.Stderr, "\n"+
"Examples for partition:\n"+
" # Consume from after the latest event on partition \"partitionid\"\n"+
" ehpartition -namespace <your event hub namespace>. + servicebus.windows.net -eventhub tests -partition \"partitionid\"\n"+
"\n"+
" # Consume including the latest event on partition \"partitionid\"\n"+
" ehpartition -namespace <your event hub namespace>. + servicebus.windows.net -eventhub tests -partition \"partitionid\" -start \"@latest\" -inclusive\n"+
"\n"+
" # Consume from the beginning of partition \"partitionid\"\n"+
" ehpartition -namespace <your event hub namespace>. + servicebus.windows.net -eventhub tests -partition \"partitionid\" -start \"@earliest\"\n")
}
// partitionCmd handles receiving from a single partition using a PartitionClient
func partitionCmd(commandLineArgs []string) error {
fs := flag.NewFlagSet("partition", flag.ContinueOnError)
eventHubNamespace := fs.String("namespace", "", "The fully qualified hostname of your Event Hub namespace (ex: <your event hub>.servicebus.windows.net)")
eventHubName := fs.String("eventhub", "", "The name of your Event Hub")
eventHubConsumerGroup := fs.String("consumergroup", azeventhubs.DefaultConsumerGroup, "The Event Hub consumer group used by your application")
eventHubOwnerLevel := fs.Int64("ownerlevel", -1, "The owner level of your consumer")
partitionID := fs.String("partition", "", "Partition ID to receive events from")
startPositionStr := fs.String("start", "@latest", "Start position: @latest or @earliest or o:<offset> or s:<sequence number>")
startInclusive := fs.Bool("inclusive", false, "Include the event pointed to by the start position")
maxBatchWaitTime := fs.Duration("wait", 30*time.Second, "Max wait time for events, per batch")
maxBatchSize := fs.Int("count", 1, "Maximum number of events to receive, per batch")
if err := fs.Parse(commandLineArgs); err != nil {
printPartitionExamples()
return err
}
if *eventHubName == "" || *eventHubNamespace == "" || *eventHubConsumerGroup == "" || *partitionID == "" {
fs.PrintDefaults()
printPartitionExamples()
return errors.New("missing command line arguments")
}
defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return err
}
startPosition, startPosDesc, err := calculateStartPosition(*startPositionStr, *startInclusive)
if err != nil {
return err
}
// Using an owner level lets you control exclusivity when consuming a partition.
//
// See the PartitionClientOptions.OwnerLevel field for more details: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#PartitionClientOptions
ownerLevelDesc := fmt.Sprintf("%d", *eventHubOwnerLevel)
if *eventHubOwnerLevel == -1 {
eventHubOwnerLevel = nil
ownerLevelDesc = "<none>"
}
cc, err := azeventhubs.NewConsumerClient(*eventHubNamespace, *eventHubName, *eventHubConsumerGroup, defaultAzureCred, nil)
if err != nil {
return err
}
defer cc.Close(context.Background())
pc, err := cc.NewPartitionClient(*partitionID, &azeventhubs.PartitionClientOptions{
StartPosition: startPosition,
OwnerLevel: eventHubOwnerLevel,
})
if err != nil {
return err
}
log.Printf("Processing events from partition %s, %s, owner level: %s", *partitionID, startPosDesc, ownerLevelDesc)
processPartition(context.Background(), pc, *partitionID, *maxBatchWaitTime, *maxBatchSize)
return nil
}
func calculateStartPosition(startPositionStr string, startInclusive bool) (azeventhubs.StartPosition, string, error) {
startPosition := azeventhubs.StartPosition{
Inclusive: startInclusive,
}
startPosDesc := fmt.Sprintf("Inclusive: %t", startInclusive)
if strings.HasPrefix(startPositionStr, "s:") {
v, err := strconv.ParseInt((startPositionStr)[2:], 10, 64)
if err != nil {
return azeventhubs.StartPosition{}, "", fmt.Errorf("'%s' is an invalid start position", startPositionStr)
}
startPosDesc = fmt.Sprintf("sequence number: %d, %s", v, startPosDesc)
startPosition.SequenceNumber = &v
} else if strings.HasPrefix(startPositionStr, "o:") {
v, err := strconv.ParseInt((startPositionStr)[2:], 10, 64)
if err != nil {
return azeventhubs.StartPosition{}, "", fmt.Errorf("'%s' is an invalid start position", startPositionStr)
}
startPosDesc = fmt.Sprintf("offset: %d, %s", v, startPosDesc)
startPosition.Offset = &v
} else if startPositionStr == "@earliest" {
startPosDesc = "earliest, " + startPosDesc
startPosition.Earliest = to.Ptr(true)
} else if startPositionStr == "@latest" {
startPosDesc = "latest, " + startPosDesc
startPosition.Latest = to.Ptr(true)
} else {
return azeventhubs.StartPosition{}, "", fmt.Errorf("'%s' is an invalid start position", startPositionStr)
}
return startPosition, startPosDesc, nil
}
func processPartition(ctx context.Context, pc *azeventhubs.PartitionClient, partitionID string, eventHubMaxTime time.Duration, eventHubMaxSize int) {
defer pc.Close(ctx)
for {
receiveCtx, cancelReceive := context.WithTimeout(ctx, eventHubMaxTime)
events, err := pc.ReceiveEvents(receiveCtx, eventHubMaxSize, nil)
cancelReceive()
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
if ctx.Err() != nil { // parent cancelled
break
}
// timing out without any events is fine. Continue receiving...
continue
} else if err != nil {
log.Printf("ERROR while processing partition %q: %s", partitionID, err)
break
}
if err := printEventsAsJSON(partitionID, events); err != nil {
log.Printf("ERROR: %s", err)
break
}
}
}
func printEventsAsJSON(partitionID string, events []*azeventhubs.ReceivedEventData) error {
for _, evt := range events {
var bodyBytes []int
for _, b := range evt.Body {
bodyBytes = append(bodyBytes, int(b))
}
// pick out some of the common fields
jsonBytes, err := json.Marshal(struct {
PartitionID string
MessageID any
BodyAsString string
Body []int
SequenceNumber int64
Offset int64
}{partitionID, evt.MessageID, string(evt.Body), bodyBytes, evt.SequenceNumber, evt.Offset})
if err != nil {
return fmt.Errorf("Failed to marshal received event with message ID %v: %s", evt.MessageID, err.Error())
}
fmt.Printf("%s\n", string(jsonBytes))
}
return nil
}
Summary
The Azure Event Hubs for Go library is the perfect complement to applications that want to take advantage of the high throughput and scalability of Event Hubs in Go programs.
To learn more about the azeventhubs package, see our documentation. You can also find more examples either on pkg.go.dev or in our GitHub repository.
Feedback
We’d love to hear about your experiences using the Azure SDK for Go. Send us your feedback on our Slack Channel or at the #golang-friends channel on the Microsoft Open Source Discord Server.
For feature requests, bug reports, or general support, open an issue in the Azure SDK for Go repository on GitHub. For more information on how we triage issues, see the Azure SDK GitHub Issue Support Process.
0 comments