Announcing the stable release of the Azure Event Hubs client library for Go

Richard Park

The Azure SDK for Go team at Microsoft is excited to announce the stable release of the Azure Event Hubs client library for Go. Azure Event Hubs is a fully managed, real-time data ingestion service that’s simple, trusted, and scalable.

NOTE: If you’re using the legacy Azure Event Hubs library for Go and would like to upgrade, see the migration guide.

Install the package

The Azure Event Hubs client library is named azeventhubs. To install the latest version of azeventhubs, use the go get command. You can use the Azure Identity library to authenticate the client application.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

# Optionally, if you also want to use Azure Identity for authentication
go get github.com/Azure/azure-sdk-for-go/sdk/azidentity

We assume that you have:

  • An Azure subscription with an Azure Event Hubs namespace.
  • A working development environment for Go version 1.18 or above.

For instructions on creating an Azure Event Hubs namespace, follow this step-by-step guide.

Create a client

azeventhubs has two clients:

  • The ProducerClient is used to send events to partitions, utilizing batches for efficiency.
  • The ConsumerClient is used to consume events from partitions. It’s also used by the Processor to receive events using checkpointing.

Both clients are concurrency-safe.

You can create either client using a TokenCredential type, such as DefaultAzureCredential from the Azure Identity library, or using an Azure Event Hubs connection string.

NOTE: We recommend you use the TokenCredential type for authentication. Connection string authentication isn’t recommended. For more information about going passwordless, see Passwordless connections for Azure services.

The DefaultAzureCredential combines several credential types into one easy-to-use type. It can authenticate using the Azure CLI, managed identities, and more. For more information about available credential types, see the azidentity documentation.

See the example for NewConsumerClient or NewProducerClient.

Use Azure Event Hubs connection string

Azure Event Hubs also supports authentication using a connection string, which you can get from the Azure portal. For more information about getting a connection string, see Get an Event Hubs connection string.

See the example for NewConsumerClientFromConnectionString or NewProducerClientFromConnectionString.

Tools for sending, receiving and diagnosing Event Hubs

We’re going to build a few tools to demonstrate the features of the Event Hubs library, which you can use to monitor and test your own applications.

A demonstration in the Windows Terminal showing two of the utilities in this article - producing events using the ehproducer and receiving events using the ehprocessor

The tools:

To build these tools, complete the following steps:

  1. Create an empty folder.
  2. Create a new file in the folder, and paste the utility’s code into a main.go file (ex: main.go).
  3. In a terminal, run the following commands:
    go mod init utility
    go mod tidy
    go build

ehproducer: Producing events

This utility sends events using the ProducerClient. It takes events from stdin, places them into an EventDataBatch, and then sends the batch to Event Hubs. You can also choose your own partition key/partition ID or send it to Event Hubs and let it distribute the events automatically.

For more information about how partitions work, see Partitioning on the Event Hubs Features page.

// This tool shows how to send events to an Event Hub, targeting a partition ID/partition key or allowing
// Event Hubs to choose the destination partition.
//
// For more information about partitioning see: https://learn.microsoft.com/azure/event-hubs/event-hubs-features#partitions

package main

import (
    "bufio"
    "context"
    "errors"
    "flag"
    "fmt"
    "io"
    "log"
    "os"

    azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
    if err := produceEventsTool(); err != nil {
        fmt.Fprintf(os.Stderr, "ERROR: %s\n", err)
        os.Exit(1)
    }
}

func printProduceEventsExamples() {
    fmt.Fprintf(os.Stderr, "Examples:\n"+
        "  # Send a single event to partition with ID \"partitionid\" from STDIN\n"+
        "  echo hello | ehproducer -namespace your-event-hub-namespace.servicebus.windows.net -eventhub tests -partition \"partitionid\"\n"+
        "\n"+
        "  # Send a single event to partition with ID \"partitionid\" from a file\n"+
        "  ehproducer -namespace your-event-hub-namespace.servicebus.windows.net -eventhub tests -partition \"partitionid\" < samplemessage.txt\n"+

        "\n"+
        "  # Send multiple events to partition with ID \"partitionid\" from a file\n"+
        "  ehproducer -namespace your-event-hub-namespace.servicebus.windows.net -eventhub testing -partition \"partitionid\" < file_with_one_message_per_line.txt\n",
    )
}

func produceEventsTool() error {
    fs := flag.NewFlagSet("ehproducer", flag.ContinueOnError)

    eventHubNamespace := fs.String("namespace", "", "The fully qualified hostname of your Event Hub namespace (ex: <your event hub>.servicebus.windows.net)")
    eventHubName := fs.String("eventhub", "", "The name of your Event Hub")
    partitionKey := fs.String("partitionkey", "", "Partition key for events we send.")
    partitionID := fs.String("partition", "", "Partition ID to send events to. By default, allows Event Hubs to assign a partition")
    readMultiple := fs.Bool("multiple", false, "Whether each line of STDIN should be treated as a separate event, or if all the lines should be joined and sent as a single event")

    verbose := fs.Bool("v", false, "Enable Azure SDK verbose logging")

    if err := fs.Parse(os.Args[1:]); err != nil {
        printProduceEventsExamples()
        return err
    }

    if *eventHubNamespace == "" || *eventHubName == "" && (*partitionKey == "" || *partitionID == "") {
        fs.PrintDefaults()
        printProduceEventsExamples()
        return errors.New("Missing command line arguments")
    }

    if *verbose {
        azlog.SetEvents(azeventhubs.EventConsumer, azeventhubs.EventConn, azeventhubs.EventAuth, azeventhubs.EventProducer)
        azlog.SetListener(func(e azlog.Event, s string) {
            log.Printf("[%s] %s", e, s)
        })
    }

    defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)

    if err != nil {
        return err
    }

    producerClient, err := azeventhubs.NewProducerClient(*eventHubNamespace, *eventHubName, defaultAzureCred, nil)

    if err != nil {
        return err
    }

    defer producerClient.Close(context.Background())

    batchOptions := &azeventhubs.EventDataBatchOptions{}

    if *partitionKey != "" {
        batchOptions.PartitionKey = partitionKey
    }

    if *partitionID != "" {
        batchOptions.PartitionID = partitionID
    }

    batch, err := producerClient.NewEventDataBatch(context.Background(), batchOptions)

    if err != nil {
        return err
    }

    if err := readEventsFromStdin(*readMultiple, batch); err != nil {
        return err
    }

    if err := producerClient.SendEventDataBatch(context.Background(), batch, nil); err != nil {
        return err
    }

    fmt.Fprintf(os.Stderr, "Sent %d events, %d bytes\n", batch.NumEvents(), batch.NumBytes())
    return nil
}

func readEventsFromStdin(readMultiple bool, batch *azeventhubs.EventDataBatch) error {
    if readMultiple {
        scanner := bufio.NewScanner(os.Stdin)

        // This is a simplified approach and will fail if the size of the messages exceeds
        // the maximum allowed size. For an example of how to handle this, see this example:
        // https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#example-package-ProducingEventsUsingProducerClient
        for scanner.Scan() {
            if err := batch.AddEventData(&azeventhubs.EventData{
                Body: scanner.Bytes(),
            }, nil); err != nil {
                return err
            }
        }

        return scanner.Err()
    } else {
        bytes, err := io.ReadAll(os.Stdin)

        if err != nil {
            return err
        }

        return batch.AddEventData(&azeventhubs.EventData{
            Body: bytes,
        }, nil)
    }
}

ehprocessor: Consuming events with checkpointing using the Processor

In the previous section, “Tool: Producing events”, we produced events, sending them to partitions in our event hub.

This tool uses the Processor, a higher-level type that makes it easy to build an application that horizontally scales. It also provides a built-in mechanism for storing your progress using checkpoints.

// This tool lets you consume events in two ways using the Processor. The Processor
// tracks progress and can balance load between itself and other Processors,
// storing checkpoint information to Azure Storage Blobs.

package main

import (
    "context"
    "encoding/json"
    "errors"
    "flag"
    "fmt"
    "log"
    "os"
    "time"

    azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
)

func printProcessorExamples() {
    fmt.Fprintf(os.Stderr, "\n"+
        "Examples for processor:\n"+
        "  # Consume from multiple partitions, using the Processor and checkpointing\n"+
        "  ehprocessor -namespace <your event hub namespace>.servicebus.windows.net -eventhub tests -storageaccount https://<your storage account>.blob.core.windows.net -container <your storage container>\n"+
        "\n")
}

func processorCmd() error {
    eventHubNamespace := flag.String("namespace", "", "The fully qualified hostname of your Event Hub namespace (ex: <your event hub>.servicebus.windows.net)")
    eventHubName := flag.String("eventhub", "", "The name of your Event Hub")
    eventHubConsumerGroup := flag.String("consumergroup", azeventhubs.DefaultConsumerGroup, "The Event Hub consumer group used by your application")

    maxBatchWaitTime := flag.Duration("wait", 30*time.Second, "Max wait time for events, per batch")
    maxBatchSize := flag.Int("count", 1, "Maximum number of events to receive, per batch")

    storageAccountURL := flag.String("storageaccount", "", "The storage account URL used by your blob store (ex: https://<storage account name>.blob.core.windows.net)")
    storageContainerName := flag.String("container", "", "The storage container used by your checkpoints")

    verbose := flag.Bool("v", false, "Enable Azure SDK verbose logging")

    flag.Parse()

    if *eventHubName == "" || *eventHubNamespace == "" || *eventHubConsumerGroup == "" || *storageAccountURL == "" || *storageContainerName == "" {
        flag.PrintDefaults()
        printProcessorExamples()

        return errors.New("missing command line arguments")
    }

    if *verbose {
        azlog.SetEvents(azeventhubs.EventConsumer, azeventhubs.EventConn, azeventhubs.EventAuth, azeventhubs.EventProducer)
        azlog.SetListener(func(e azlog.Event, s string) {
            log.Printf("[%s] %s", e, s)
        })
    }

    defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)

    if err != nil {
        return err
    }

    checkpointBlobStore, err := createCheckpointStore(storageAccountURL, defaultAzureCred, storageContainerName)

    if err != nil {
        return err
    }

    consumerClient, err := azeventhubs.NewConsumerClient(*eventHubNamespace, *eventHubName, *eventHubConsumerGroup, defaultAzureCred, nil)

    if err != nil {
        return err
    }

    defer consumerClient.Close(context.Background())

    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointBlobStore, &azeventhubs.ProcessorOptions{
        LoadBalancingStrategy: azeventhubs.ProcessorStrategyGreedy,
    })

    if err != nil {
        return err
    }

    appCtx, appCancel := context.WithCancel(context.Background())
    defer appCancel()

    dispatchPartitionClients := func() {
        log.Printf("PartitionClient dispatcher has started...")
        defer log.Printf("PartitionClient dispatcher has stopped.")

        for {
            pc := processor.NextPartitionClient(appCtx)

            if pc == nil {
                log.Println("Processor has stopped, stopping partition client dispatch loop")
                break
            }

            log.Printf("Acquired partition %s, receiving", pc.PartitionID())

            go processPartition(appCtx, pc, *maxBatchWaitTime, *maxBatchSize)
        }
    }

    go dispatchPartitionClients()

    log.Printf("Starting processor.")
    if err := processor.Run(appCtx); err != nil {
        return err
    }

    return nil
}

func createCheckpointStore(storageAccountURL *string, defaultAzureCred *azidentity.DefaultAzureCredential, storageContainerName *string) (azeventhubs.CheckpointStore, error) {
    blobClient, err := azblob.NewClient(*storageAccountURL, defaultAzureCred, nil)

    if err != nil {
        return nil, err
    }

    containerClient := blobClient.ServiceClient().NewContainerClient(*storageContainerName)

    log.Printf("Creating storage container %q, if it doesn't already exist", *storageContainerName)

    if _, err := containerClient.Create(context.Background(), nil); err != nil {
        if !bloberror.HasCode(err, bloberror.ContainerAlreadyExists) {
            return nil, err
        }
    }

    return checkpoints.NewBlobStore(containerClient, nil)
}

func processPartition(ctx context.Context, pc *azeventhubs.ProcessorPartitionClient, eventHubMaxTime time.Duration, eventHubMaxSize int) {
    defer pc.Close(ctx)

    for {
        receiveCtx, cancelReceive := context.WithTimeout(ctx, eventHubMaxTime)
        events, err := pc.ReceiveEvents(receiveCtx, eventHubMaxSize, nil)
        cancelReceive()

        if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
            if ctx.Err() != nil { // parent cancelled
                break
            }

            // timing out without any events is fine. Continue receiving...
            continue
        } else if err != nil {
            log.Printf("ERROR while processing partition %q: %s", pc.PartitionID(), err)
            break
        }

        if len(events) > 0 {
            if err := printEventsAsJSON(pc.PartitionID(), events); err != nil {
                log.Printf("ERROR: failed when printing events: %s", err)
                break
            }

            latestEvent := events[len(events)-1]

            log.Printf("[%s] Updating checkpoint with offset: %d, sequenceNumber: %d", pc.PartitionID(), latestEvent.SequenceNumber, latestEvent.Offset)

            if err := pc.UpdateCheckpoint(ctx, latestEvent, nil); err != nil {
                log.Printf("ERROR: failed when updating checkpoint: %s", err)
            }
        }
    }
}

func printEventsAsJSON(partitionID string, events []*azeventhubs.ReceivedEventData) error {
    for _, evt := range events {
        var bodyBytes []int

        for _, b := range evt.Body {
            bodyBytes = append(bodyBytes, int(b))
        }

        // pick out some of the common fields
        jsonBytes, err := json.Marshal(struct {
            PartitionID    string
            MessageID      any
            BodyAsString   string
            Body           []int
            SequenceNumber int64
            Offset         int64
        }{partitionID, evt.MessageID, string(evt.Body), bodyBytes, evt.SequenceNumber, evt.Offset})

        if err != nil {
            return fmt.Errorf("Failed to marshal received event with message ID %v: %s", evt.MessageID, err.Error())
        }

        fmt.Printf("%s\n", string(jsonBytes))
    }

    return nil
}

func main() {
    if err := processorCmd(); err != nil {
        fmt.Printf("ERROR: %s\n", err)
        os.Exit(1)
    }
}

ehlagcalc: Calculating consumption lag using the CheckpointStore

Consumers can sometimes fall behind when receiving events. We can get a rough estimate of how far behind we are by comparing the last checkpoint for a partition against the last enqueued sequence number.

// This tool queries metadata from Event Hubs and checks it against information stored in the checkpoint
// store to calculate the "lag" between our Processors and the service. It's best used as a rough approximation
// of state as the data sources are not necessarily in-sync when updates occur frequently.

package main

import (
    "context"
    "errors"
    "flag"
    "fmt"
    "os"
    "sort"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)

func main() {
    if err := checkpointLagTool(os.Args[1:]); err != nil {
        fmt.Fprintf(os.Stderr, "ERROR: %s\n", err)
        os.Exit(1)
    }
}

func checkpointLagTool(commandLineArgs []string) error {
    clients, err := createClients()

    if err != nil {
        return err
    }

    defer clients.ConsumerClient.Close(context.Background())

    eventHubProps, err := clients.ConsumerClient.GetEventHubProperties(context.Background(), nil)

    if err != nil {
        return err
    }

    checkpoints, err := clients.CheckpointStore.ListCheckpoints(context.Background(), clients.EventHubNamespace, clients.EventHubName, clients.EventHubConsumerGroup, nil)

    if err != nil {
        return err
    }

    checkpointsMap := map[string]*azeventhubs.Checkpoint{}

    for _, cp := range checkpoints {
        cp := cp
        checkpointsMap[cp.PartitionID] = &cp
    }
    ownerships, err := clients.CheckpointStore.ListOwnership(context.Background(), clients.EventHubNamespace, clients.EventHubName, clients.EventHubConsumerGroup, nil)

    if err != nil {
        return err
    }

    ownersMap := map[string]*azeventhubs.Ownership{}

    for _, o := range ownerships {
        o := o
        ownersMap[o.PartitionID] = &o
    }

    sort.Strings(eventHubProps.PartitionIDs)

    fmt.Fprintf(os.Stderr, "WARNING: Excessive querying of the checkpoint store/Event Hubs can impact application performance.\n")

    for _, partID := range eventHubProps.PartitionIDs {
        partID := partID

        cp, o := checkpointsMap[partID], ownersMap[partID]

        partProps, err := clients.ConsumerClient.GetPartitionProperties(context.Background(), partID, nil)

        if err != nil {
            return err
        }

        fmt.Printf("Partition ID %q\n", partID)

        if o != nil {
            fmt.Printf("  Owner ID: %q, last updated: %s\n", o.OwnerID, o.LastModifiedTime.Format(time.RFC3339))
        } else {
            fmt.Printf("  Owner ID: <no owner>\n")
        }

        fmt.Printf("  Last enqueued sequence number is %d\n", partProps.LastEnqueuedSequenceNumber)

        if cp != nil && cp.SequenceNumber != nil {
            fmt.Printf("  Delta (between service and checkpoint): %d\n", partProps.LastEnqueuedSequenceNumber-*cp.SequenceNumber)
        }
    }

    return nil
}

type clients struct {
    ConsumerClient  *azeventhubs.ConsumerClient
    CheckpointStore *checkpoints.BlobStore

    EventHubNamespace     string
    EventHubName          string
    EventHubConsumerGroup string
}

func createClients() (*clients, error) {
    eventHubNamespace := flag.String("namespace", "", "The fully qualified hostname of your Event Hub namespace (ex: <your event hub>.servicebus.windows.net)")
    eventHubName := flag.String("eventhub", "", "The name of your Event Hub")
    eventHubConsumerGroup := flag.String("consumergroup", azeventhubs.DefaultConsumerGroup, "The Event Hub consumer group used by your application")

    storageAccountURL := flag.String("storageaccount", "", "The storage account URL used by your blob store (ex: https://<storage account name>.blob.core.windows.net/)")
    storageContainerName := flag.String("container", "", "The storage container used by your checkpoints")

    flag.Parse()

    if *eventHubNamespace == "" || *eventHubName == "" || *eventHubConsumerGroup == "" || *storageAccountURL == "" || *storageContainerName == "" {
        flag.PrintDefaults()
        return nil, errors.New("Missing command line arguments")
    }

    defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)

    if err != nil {
        return nil, err
    }

    blobClient, err := azblob.NewClient(*storageAccountURL, defaultAzureCred, nil)

    if err != nil {
        return nil, err
    }

    blobStore, err := checkpoints.NewBlobStore(blobClient.ServiceClient().NewContainerClient(*storageContainerName), nil)

    if err != nil {
        return nil, err
    }

    // Both ProducerClient and ConsumerClient can query event hub partition properties.
    cc, err := azeventhubs.NewConsumerClient(*eventHubNamespace, *eventHubName, *eventHubConsumerGroup, defaultAzureCred, nil)

    if err != nil {
        return nil, err
    }

    return &clients{
        ConsumerClient:        cc,
        CheckpointStore:       blobStore,
        EventHubNamespace:     *eventHubNamespace,
        EventHubName:          *eventHubName,
        EventHubConsumerGroup: *eventHubConsumerGroup,
    }, nil
}

ehpartition: Consuming events from a single partition, no checkpointing

The ConsumerClient can be used to receive events from specific partitions. The ConsumerClient doesn’t do load balancing or checkpointing and is perfect for when you want to have absolute control over which partitions are read and where you want to start reading.

// This tool lets you consume events from a single partition using the ProducerClient.
// The PartitionClient does not do checkpointing and can only consume from a single
// partition at a time. Look at the "ehprocessor" tool, which uses the Processor.

package main

import (
    "context"
    "encoding/json"
    "errors"
    "flag"
    "fmt"
    "log"
    "os"
    "strconv"
    "strings"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
    if err := partitionCmd(os.Args[:]); err != nil {
        fmt.Printf("ERROR: %s\n", err)
        os.Exit(1)
    }
}

func printPartitionExamples() {
    fmt.Fprintf(os.Stderr, "\n"+
        "Examples for partition:\n"+
        "  # Consume from after the latest event on partition \"partitionid\"\n"+
        "  ehpartition -namespace <your event hub namespace>. + servicebus.windows.net -eventhub tests -partition \"partitionid\"\n"+
        "\n"+
        "  # Consume including the latest event on partition \"partitionid\"\n"+
        "  ehpartition -namespace <your event hub namespace>. + servicebus.windows.net -eventhub tests -partition \"partitionid\" -start \"@latest\" -inclusive\n"+
        "\n"+
        "  # Consume from the beginning of partition \"partitionid\"\n"+
        "  ehpartition -namespace <your event hub namespace>. + servicebus.windows.net -eventhub tests -partition \"partitionid\" -start \"@earliest\"\n")
}

// partitionCmd handles receiving from a single partition using a PartitionClient
func partitionCmd(commandLineArgs []string) error {
    fs := flag.NewFlagSet("partition", flag.ContinueOnError)

    eventHubNamespace := fs.String("namespace", "", "The fully qualified hostname of your Event Hub namespace (ex: <your event hub>.servicebus.windows.net)")
    eventHubName := fs.String("eventhub", "", "The name of your Event Hub")
    eventHubConsumerGroup := fs.String("consumergroup", azeventhubs.DefaultConsumerGroup, "The Event Hub consumer group used by your application")
    eventHubOwnerLevel := fs.Int64("ownerlevel", -1, "The owner level of your consumer")
    partitionID := fs.String("partition", "", "Partition ID to receive events from")

    startPositionStr := fs.String("start", "@latest", "Start position: @latest or @earliest or o:<offset> or s:<sequence number>")
    startInclusive := fs.Bool("inclusive", false, "Include the event pointed to by the start position")

    maxBatchWaitTime := fs.Duration("wait", 30*time.Second, "Max wait time for events, per batch")
    maxBatchSize := fs.Int("count", 1, "Maximum number of events to receive, per batch")

    if err := fs.Parse(commandLineArgs); err != nil {
        printPartitionExamples()
        return err
    }

    if *eventHubName == "" || *eventHubNamespace == "" || *eventHubConsumerGroup == "" || *partitionID == "" {
        fs.PrintDefaults()
        printPartitionExamples()
        return errors.New("missing command line arguments")
    }

    defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)

    if err != nil {
        return err
    }

    startPosition, startPosDesc, err := calculateStartPosition(*startPositionStr, *startInclusive)

    if err != nil {
        return err
    }

    // Using an owner level lets you control exclusivity when consuming a partition.
    //
    // See the PartitionClientOptions.OwnerLevel field for more details: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#PartitionClientOptions
    ownerLevelDesc := fmt.Sprintf("%d", *eventHubOwnerLevel)

    if *eventHubOwnerLevel == -1 {
        eventHubOwnerLevel = nil
        ownerLevelDesc = "<none>"
    }

    cc, err := azeventhubs.NewConsumerClient(*eventHubNamespace, *eventHubName, *eventHubConsumerGroup, defaultAzureCred, nil)

    if err != nil {
        return err
    }

    defer cc.Close(context.Background())

    pc, err := cc.NewPartitionClient(*partitionID, &azeventhubs.PartitionClientOptions{
        StartPosition: startPosition,
        OwnerLevel:    eventHubOwnerLevel,
    })

    if err != nil {
        return err
    }

    log.Printf("Processing events from partition %s, %s, owner level: %s", *partitionID, startPosDesc, ownerLevelDesc)
    processPartition(context.Background(), pc, *partitionID, *maxBatchWaitTime, *maxBatchSize)
    return nil
}

func calculateStartPosition(startPositionStr string, startInclusive bool) (azeventhubs.StartPosition, string, error) {
    startPosition := azeventhubs.StartPosition{
        Inclusive: startInclusive,
    }

    startPosDesc := fmt.Sprintf("Inclusive: %t", startInclusive)

    if strings.HasPrefix(startPositionStr, "s:") {
        v, err := strconv.ParseInt((startPositionStr)[2:], 10, 64)

        if err != nil {
            return azeventhubs.StartPosition{}, "", fmt.Errorf("'%s' is an invalid start position", startPositionStr)
        }

        startPosDesc = fmt.Sprintf("sequence number: %d, %s", v, startPosDesc)
        startPosition.SequenceNumber = &v
    } else if strings.HasPrefix(startPositionStr, "o:") {
        v, err := strconv.ParseInt((startPositionStr)[2:], 10, 64)

        if err != nil {
            return azeventhubs.StartPosition{}, "", fmt.Errorf("'%s' is an invalid start position", startPositionStr)
        }

        startPosDesc = fmt.Sprintf("offset: %d, %s", v, startPosDesc)
        startPosition.Offset = &v
    } else if startPositionStr == "@earliest" {
        startPosDesc = "earliest, " + startPosDesc
        startPosition.Earliest = to.Ptr(true)
    } else if startPositionStr == "@latest" {
        startPosDesc = "latest, " + startPosDesc
        startPosition.Latest = to.Ptr(true)
    } else {
        return azeventhubs.StartPosition{}, "", fmt.Errorf("'%s' is an invalid start position", startPositionStr)
    }
    return startPosition, startPosDesc, nil
}

func processPartition(ctx context.Context, pc *azeventhubs.PartitionClient, partitionID string, eventHubMaxTime time.Duration, eventHubMaxSize int) {
    defer pc.Close(ctx)

    for {
        receiveCtx, cancelReceive := context.WithTimeout(ctx, eventHubMaxTime)
        events, err := pc.ReceiveEvents(receiveCtx, eventHubMaxSize, nil)
        cancelReceive()

        if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
            if ctx.Err() != nil { // parent cancelled
                break
            }

            // timing out without any events is fine. Continue receiving...
            continue
        } else if err != nil {
            log.Printf("ERROR while processing partition %q: %s", partitionID, err)
            break
        }

        if err := printEventsAsJSON(partitionID, events); err != nil {
            log.Printf("ERROR: %s", err)
            break
        }
    }
}

func printEventsAsJSON(partitionID string, events []*azeventhubs.ReceivedEventData) error {
    for _, evt := range events {
        var bodyBytes []int

        for _, b := range evt.Body {
            bodyBytes = append(bodyBytes, int(b))
        }

        // pick out some of the common fields
        jsonBytes, err := json.Marshal(struct {
            PartitionID    string
            MessageID      any
            BodyAsString   string
            Body           []int
            SequenceNumber int64
            Offset         int64
        }{partitionID, evt.MessageID, string(evt.Body), bodyBytes, evt.SequenceNumber, evt.Offset})

        if err != nil {
            return fmt.Errorf("Failed to marshal received event with message ID %v: %s", evt.MessageID, err.Error())
        }

        fmt.Printf("%s\n", string(jsonBytes))
    }

    return nil
}

Summary

The Azure Event Hubs for Go library is the perfect complement to applications that want to take advantage of the high throughput and scalability of Event Hubs in Go programs.

To learn more about the azeventhubs package, see our documentation. You can also find more examples either on pkg.go.dev or in our GitHub repository.

Feedback

We’d love to hear about your experiences using the Azure SDK for Go. Send us your feedback on our Slack Channel or at the #golang-friends channel on the Microsoft Open Source Discord Server.

For feature requests, bug reports, or general support, open an issue in the Azure SDK for Go repository on GitHub. For more information on how we triage issues, see the Azure SDK GitHub Issue Support Process.

0 comments

Discussion is closed.

Feedback usabilla icon