{"id":2684,"date":"2023-05-25T05:21:18","date_gmt":"2023-05-25T12:21:18","guid":{"rendered":"https:\/\/devblogs.microsoft.com\/azure-sdk\/?p=2684"},"modified":"2023-05-25T05:21:18","modified_gmt":"2023-05-25T12:21:18","slug":"announcing-the-stable-release-of-the-azure-event-hubs-client-library-for-go","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/azure-sdk\/announcing-the-stable-release-of-the-azure-event-hubs-client-library-for-go\/","title":{"rendered":"Announcing the stable release of the Azure Event Hubs client library for Go"},"content":{"rendered":"<p>The Azure SDK for Go team at Microsoft is excited to announce the stable release of the <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs\">Azure Event Hubs client library for Go<\/a>. <a href=\"https:\/\/azure.microsoft.com\/products\/event-hubs\/\">Azure Event Hubs<\/a> is a fully managed, real-time data ingestion service that\u2019s simple, trusted, and scalable.<\/p>\n<blockquote><p>NOTE: If you&#8217;re using the legacy <a href=\"https:\/\/github.com\/Azure\/azure-event-hubs-go\">Azure Event Hubs library for Go<\/a> and would like to upgrade, see the <a href=\"https:\/\/github.com\/Azure\/azure-sdk-for-go\/blob\/main\/sdk\/messaging\/azeventhubs\/migrationguide.md\">migration guide<\/a>.<\/p><\/blockquote>\n<h2>Install the package<\/h2>\n<p>The Azure Event Hubs client library is named <code>azeventhubs<\/code>. To install the latest version of <code>azeventhubs<\/code>, use the <code>go get<\/code> command. You can use the Azure Identity library to authenticate the client application.<\/p>\n<pre><code class=\"language-bash\">go get github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs\r\n\r\n# Optionally, if you also want to use Azure Identity for authentication\r\ngo get github.com\/Azure\/azure-sdk-for-go\/sdk\/azidentity<\/code><\/pre>\n<p>We assume that you have:<\/p>\n<ul>\n<li>An Azure subscription with an Azure Event Hubs namespace.<\/li>\n<li>A working development environment for Go version 1.18 or above.<\/li>\n<\/ul>\n<p>For instructions on creating an Azure Event Hubs namespace, follow this <a href=\"https:\/\/learn.microsoft.com\/azure\/event-hubs\/event-hubs-create\">step-by-step guide<\/a>.<\/p>\n<h2>Create a client<\/h2>\n<p><code>azeventhubs<\/code> has two clients:<\/p>\n<ul>\n<li>The <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#ProducerClient\"><code>ProducerClient<\/code><\/a> is used to send events to partitions, utilizing batches for efficiency.<\/li>\n<li>The <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#ConsumerClient\"><code>ConsumerClient<\/code><\/a> is used to consume events from partitions. It&#8217;s also used by the <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#Processor\"><code>Processor<\/code><\/a> to receive events using checkpointing.<\/li>\n<\/ul>\n<p>Both clients are concurrency-safe.<\/p>\n<p>You can create either client using a <code>TokenCredential<\/code> type, such as <code>DefaultAzureCredential<\/code> from the Azure Identity library, or using an Azure Event Hubs <a href=\"https:\/\/learn.microsoft.com\/azure\/event-hubs\/event-hubs-get-connection-string\">connection string<\/a>.<\/p>\n<blockquote><p>NOTE: We recommend you use the <code>TokenCredential<\/code> type for authentication. Connection string authentication isn&#8217;t recommended. For more information about going passwordless, see <a href=\"https:\/\/learn.microsoft.com\/azure\/developer\/intro\/passwordless-overview\">Passwordless connections for Azure services<\/a>.<\/p><\/blockquote>\n<h3>Use the <code>DefaultAzureCredential<\/code> token credential (Recommended)<\/h3>\n<p>The <code>DefaultAzureCredential<\/code> combines several credential types into one easy-to-use type. It can authenticate using the Azure CLI, managed identities, and more. For more information about available credential types, see the <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/azidentity#section-readme\"><code>azidentity<\/code> documentation<\/a>.<\/p>\n<p>See the example for <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#example-NewConsumerClient\">NewConsumerClient<\/a> or <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#NewProducerClient\">NewProducerClient<\/a>.<\/p>\n<h3>Use Azure Event Hubs connection string<\/h3>\n<p>Azure Event Hubs also supports authentication using a connection string, which you can get from the Azure portal. For more information about getting a connection string, see <a href=\"https:\/\/learn.microsoft.com\/azure\/event-hubs\/event-hubs-get-connection-string\">Get an Event Hubs connection string<\/a>.<\/p>\n<p>See the example for <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#NewConsumerClientFromConnectionString\">NewConsumerClientFromConnectionString<\/a> or <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#NewProducerClientFromConnectionString\">NewProducerClientFromConnectionString<\/a>.<\/p>\n<h2>Tools for sending, receiving and diagnosing Event Hubs<\/h2>\n<p>We&#8217;re going to build a few tools to demonstrate the features of the Event Hubs library, which you can use to monitor and test your own applications.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-content\/uploads\/sites\/58\/2023\/05\/2023-eventhubs-cli.gif\" alt=\"A demonstration in the Windows Terminal showing two of the utilities in this article - producing events using the ehproducer and receiving events using the ehprocessor\" \/><\/p>\n<p>The tools:<\/p>\n<ul>\n<li><a href=\"#ehproducer-producing-events\"><code>ehproducer<\/code><\/a>: uses the <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#ProducerClient\"><code>ProducerClient<\/code><\/a> to send events.<\/li>\n<li><a href=\"#ehprocessor-consuming-events-with-checkpointing-using-the-processor\"><code>ehprocessor<\/code><\/a>: uses the <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#Processor\"><code>Processor<\/code><\/a> to consume multiple partitions, with checkpoints,<\/li>\n<li><a href=\"#ehpartition-consuming-events-from-a-single-partition-no-checkpointing\"><code>ehpartition<\/code><\/a>: uses the <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#PartitionClient\"><code>PartitionClient<\/code><\/a> to read from a single partition.<\/li>\n<li><a href=\"#ehlagcalc-calculating-consumption-lag-using-the-checkpointstore\"><code>ehlagcalc<\/code><\/a>: calculate lag between your <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#Processor\"><code>Processor<\/code><\/a> instances and the available events in your Event Hubs.<\/li>\n<\/ul>\n<p>To build these tools, complete the following steps:<\/p>\n<ol>\n<li>Create an empty folder.<\/li>\n<li>Create a new file in the folder, and paste the utility&#8217;s code into a main.go file (ex: main.go).<\/li>\n<li>In a terminal, run the following commands:\n<pre><code class=\"language-bash\">go mod init utility\r\ngo mod tidy\r\ngo build<\/code><\/pre>\n<\/li>\n<\/ol>\n<h3>ehproducer: Producing events<\/h3>\n<p>This utility sends events using the <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#ProducerClient\"><code>ProducerClient<\/code><\/a>. It takes events from stdin, places them into an <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#EventDataBatch\"><code>EventDataBatch<\/code><\/a>, and then sends the batch to Event Hubs. You can also choose your own partition key\/partition ID or send it to Event Hubs and let it distribute the events automatically.<\/p>\n<p>For more information about how partitions work, see <a href=\"https:\/\/learn.microsoft.com\/azure\/event-hubs\/event-hubs-features#partitions\">Partitioning on the Event Hubs Features page<\/a>.<\/p>\n<pre><code class=\"language-go\">\/\/ This tool shows how to send events to an Event Hub, targeting a partition ID\/partition key or allowing\r\n\/\/ Event Hubs to choose the destination partition.\r\n\/\/\r\n\/\/ For more information about partitioning see: https:\/\/learn.microsoft.com\/azure\/event-hubs\/event-hubs-features#partitions\r\n\r\npackage main\r\n\r\nimport (\r\n    \"bufio\"\r\n    \"context\"\r\n    \"errors\"\r\n    \"flag\"\r\n    \"fmt\"\r\n    \"io\"\r\n    \"log\"\r\n    \"os\"\r\n\r\n    azlog \"github.com\/Azure\/azure-sdk-for-go\/sdk\/azcore\/log\"\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/azidentity\"\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs\"\r\n)\r\n\r\nfunc main() {\r\n    if err := produceEventsTool(); err != nil {\r\n        fmt.Fprintf(os.Stderr, \"ERROR: %s\\n\", err)\r\n        os.Exit(1)\r\n    }\r\n}\r\n\r\nfunc printProduceEventsExamples() {\r\n    fmt.Fprintf(os.Stderr, \"Examples:\\n\"+\r\n        \"  # Send a single event to partition with ID \\\"partitionid\\\" from STDIN\\n\"+\r\n        \"  echo hello | ehproducer -namespace your-event-hub-namespace.servicebus.windows.net -eventhub tests -partition \\\"partitionid\\\"\\n\"+\r\n        \"\\n\"+\r\n        \"  # Send a single event to partition with ID \\\"partitionid\\\" from a file\\n\"+\r\n        \"  ehproducer -namespace your-event-hub-namespace.servicebus.windows.net -eventhub tests -partition \\\"partitionid\\\" &lt; samplemessage.txt\\n\"+\r\n\r\n        \"\\n\"+\r\n        \"  # Send multiple events to partition with ID \\\"partitionid\\\" from a file\\n\"+\r\n        \"  ehproducer -namespace your-event-hub-namespace.servicebus.windows.net -eventhub testing -partition \\\"partitionid\\\" &lt; file_with_one_message_per_line.txt\\n\",\r\n    )\r\n}\r\n\r\nfunc produceEventsTool() error {\r\n    fs := flag.NewFlagSet(\"ehproducer\", flag.ContinueOnError)\r\n\r\n    eventHubNamespace := fs.String(\"namespace\", \"\", \"The fully qualified hostname of your Event Hub namespace (ex: &lt;your event hub&gt;.servicebus.windows.net)\")\r\n    eventHubName := fs.String(\"eventhub\", \"\", \"The name of your Event Hub\")\r\n    partitionKey := fs.String(\"partitionkey\", \"\", \"Partition key for events we send.\")\r\n    partitionID := fs.String(\"partition\", \"\", \"Partition ID to send events to. By default, allows Event Hubs to assign a partition\")\r\n    readMultiple := fs.Bool(\"multiple\", false, \"Whether each line of STDIN should be treated as a separate event, or if all the lines should be joined and sent as a single event\")\r\n\r\n    verbose := fs.Bool(\"v\", false, \"Enable Azure SDK verbose logging\")\r\n\r\n    if err := fs.Parse(os.Args[1:]); err != nil {\r\n        printProduceEventsExamples()\r\n        return err\r\n    }\r\n\r\n    if *eventHubNamespace == \"\" || *eventHubName == \"\" &amp;&amp; (*partitionKey == \"\" || *partitionID == \"\") {\r\n        fs.PrintDefaults()\r\n        printProduceEventsExamples()\r\n        return errors.New(\"Missing command line arguments\")\r\n    }\r\n\r\n    if *verbose {\r\n        azlog.SetEvents(azeventhubs.EventConsumer, azeventhubs.EventConn, azeventhubs.EventAuth, azeventhubs.EventProducer)\r\n        azlog.SetListener(func(e azlog.Event, s string) {\r\n            log.Printf(\"[%s] %s\", e, s)\r\n        })\r\n    }\r\n\r\n    defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    producerClient, err := azeventhubs.NewProducerClient(*eventHubNamespace, *eventHubName, defaultAzureCred, nil)\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    defer producerClient.Close(context.Background())\r\n\r\n    batchOptions := &amp;azeventhubs.EventDataBatchOptions{}\r\n\r\n    if *partitionKey != \"\" {\r\n        batchOptions.PartitionKey = partitionKey\r\n    }\r\n\r\n    if *partitionID != \"\" {\r\n        batchOptions.PartitionID = partitionID\r\n    }\r\n\r\n    batch, err := producerClient.NewEventDataBatch(context.Background(), batchOptions)\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    if err := readEventsFromStdin(*readMultiple, batch); err != nil {\r\n        return err\r\n    }\r\n\r\n    if err := producerClient.SendEventDataBatch(context.Background(), batch, nil); err != nil {\r\n        return err\r\n    }\r\n\r\n    fmt.Fprintf(os.Stderr, \"Sent %d events, %d bytes\\n\", batch.NumEvents(), batch.NumBytes())\r\n    return nil\r\n}\r\n\r\nfunc readEventsFromStdin(readMultiple bool, batch *azeventhubs.EventDataBatch) error {\r\n    if readMultiple {\r\n        scanner := bufio.NewScanner(os.Stdin)\r\n\r\n        \/\/ This is a simplified approach and will fail if the size of the messages exceeds\r\n        \/\/ the maximum allowed size. For an example of how to handle this, see this example:\r\n        \/\/ https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#example-package-ProducingEventsUsingProducerClient\r\n        for scanner.Scan() {\r\n            if err := batch.AddEventData(&amp;azeventhubs.EventData{\r\n                Body: scanner.Bytes(),\r\n            }, nil); err != nil {\r\n                return err\r\n            }\r\n        }\r\n\r\n        return scanner.Err()\r\n    } else {\r\n        bytes, err := io.ReadAll(os.Stdin)\r\n\r\n        if err != nil {\r\n            return err\r\n        }\r\n\r\n        return batch.AddEventData(&amp;azeventhubs.EventData{\r\n            Body: bytes,\r\n        }, nil)\r\n    }\r\n}<\/code><\/pre>\n<h3>ehprocessor: Consuming events with checkpointing using the Processor<\/h3>\n<p>In the previous section, <a href=\"#ehproducer-producing-events\">&#8220;Tool: Producing events&#8221;<\/a>, we produced events, sending them to partitions in our event hub.<\/p>\n<p>This tool uses the <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#Processor\"><code>Processor<\/code><\/a>, a higher-level type that makes it easy to build an application that horizontally scales. It also provides a built-in mechanism for storing your progress using checkpoints.<\/p>\n<pre><code class=\"language-go\">\/\/ This tool lets you consume events in two ways using the Processor. The Processor\r\n\/\/ tracks progress and can balance load between itself and other Processors,\r\n\/\/ storing checkpoint information to Azure Storage Blobs.\r\n\r\npackage main\r\n\r\nimport (\r\n    \"context\"\r\n    \"encoding\/json\"\r\n    \"errors\"\r\n    \"flag\"\r\n    \"fmt\"\r\n    \"log\"\r\n    \"os\"\r\n    \"time\"\r\n\r\n    azlog \"github.com\/Azure\/azure-sdk-for-go\/sdk\/azcore\/log\"\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/azidentity\"\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs\"\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs\/checkpoints\"\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/storage\/azblob\"\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/storage\/azblob\/bloberror\"\r\n)\r\n\r\nfunc printProcessorExamples() {\r\n    fmt.Fprintf(os.Stderr, \"\\n\"+\r\n        \"Examples for processor:\\n\"+\r\n        \"  # Consume from multiple partitions, using the Processor and checkpointing\\n\"+\r\n        \"  ehprocessor -namespace &lt;your event hub namespace&gt;.servicebus.windows.net -eventhub tests -storageaccount https:\/\/&lt;your storage account&gt;.blob.core.windows.net -container &lt;your storage container&gt;\\n\"+\r\n        \"\\n\")\r\n}\r\n\r\nfunc processorCmd() error {\r\n    eventHubNamespace := flag.String(\"namespace\", \"\", \"The fully qualified hostname of your Event Hub namespace (ex: &lt;your event hub&gt;.servicebus.windows.net)\")\r\n    eventHubName := flag.String(\"eventhub\", \"\", \"The name of your Event Hub\")\r\n    eventHubConsumerGroup := flag.String(\"consumergroup\", azeventhubs.DefaultConsumerGroup, \"The Event Hub consumer group used by your application\")\r\n\r\n    maxBatchWaitTime := flag.Duration(\"wait\", 30*time.Second, \"Max wait time for events, per batch\")\r\n    maxBatchSize := flag.Int(\"count\", 1, \"Maximum number of events to receive, per batch\")\r\n\r\n    storageAccountURL := flag.String(\"storageaccount\", \"\", \"The storage account URL used by your blob store (ex: https:\/\/&lt;storage account name&gt;.blob.core.windows.net)\")\r\n    storageContainerName := flag.String(\"container\", \"\", \"The storage container used by your checkpoints\")\r\n\r\n    verbose := flag.Bool(\"v\", false, \"Enable Azure SDK verbose logging\")\r\n\r\n    flag.Parse()\r\n\r\n    if *eventHubName == \"\" || *eventHubNamespace == \"\" || *eventHubConsumerGroup == \"\" || *storageAccountURL == \"\" || *storageContainerName == \"\" {\r\n        flag.PrintDefaults()\r\n        printProcessorExamples()\r\n\r\n        return errors.New(\"missing command line arguments\")\r\n    }\r\n\r\n    if *verbose {\r\n        azlog.SetEvents(azeventhubs.EventConsumer, azeventhubs.EventConn, azeventhubs.EventAuth, azeventhubs.EventProducer)\r\n        azlog.SetListener(func(e azlog.Event, s string) {\r\n            log.Printf(\"[%s] %s\", e, s)\r\n        })\r\n    }\r\n\r\n    defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    checkpointBlobStore, err := createCheckpointStore(storageAccountURL, defaultAzureCred, storageContainerName)\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    consumerClient, err := azeventhubs.NewConsumerClient(*eventHubNamespace, *eventHubName, *eventHubConsumerGroup, defaultAzureCred, nil)\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    defer consumerClient.Close(context.Background())\r\n\r\n    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointBlobStore, &amp;azeventhubs.ProcessorOptions{\r\n        LoadBalancingStrategy: azeventhubs.ProcessorStrategyGreedy,\r\n    })\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    appCtx, appCancel := context.WithCancel(context.Background())\r\n    defer appCancel()\r\n\r\n    dispatchPartitionClients := func() {\r\n        log.Printf(\"PartitionClient dispatcher has started...\")\r\n        defer log.Printf(\"PartitionClient dispatcher has stopped.\")\r\n\r\n        for {\r\n            pc := processor.NextPartitionClient(appCtx)\r\n\r\n            if pc == nil {\r\n                log.Println(\"Processor has stopped, stopping partition client dispatch loop\")\r\n                break\r\n            }\r\n\r\n            log.Printf(\"Acquired partition %s, receiving\", pc.PartitionID())\r\n\r\n            go processPartition(appCtx, pc, *maxBatchWaitTime, *maxBatchSize)\r\n        }\r\n    }\r\n\r\n    go dispatchPartitionClients()\r\n\r\n    log.Printf(\"Starting processor.\")\r\n    if err := processor.Run(appCtx); err != nil {\r\n        return err\r\n    }\r\n\r\n    return nil\r\n}\r\n\r\nfunc createCheckpointStore(storageAccountURL *string, defaultAzureCred *azidentity.DefaultAzureCredential, storageContainerName *string) (azeventhubs.CheckpointStore, error) {\r\n    blobClient, err := azblob.NewClient(*storageAccountURL, defaultAzureCred, nil)\r\n\r\n    if err != nil {\r\n        return nil, err\r\n    }\r\n\r\n    containerClient := blobClient.ServiceClient().NewContainerClient(*storageContainerName)\r\n\r\n    log.Printf(\"Creating storage container %q, if it doesn't already exist\", *storageContainerName)\r\n\r\n    if _, err := containerClient.Create(context.Background(), nil); err != nil {\r\n        if !bloberror.HasCode(err, bloberror.ContainerAlreadyExists) {\r\n            return nil, err\r\n        }\r\n    }\r\n\r\n    return checkpoints.NewBlobStore(containerClient, nil)\r\n}\r\n\r\nfunc processPartition(ctx context.Context, pc *azeventhubs.ProcessorPartitionClient, eventHubMaxTime time.Duration, eventHubMaxSize int) {\r\n    defer pc.Close(ctx)\r\n\r\n    for {\r\n        receiveCtx, cancelReceive := context.WithTimeout(ctx, eventHubMaxTime)\r\n        events, err := pc.ReceiveEvents(receiveCtx, eventHubMaxSize, nil)\r\n        cancelReceive()\r\n\r\n        if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {\r\n            if ctx.Err() != nil { \/\/ parent cancelled\r\n                break\r\n            }\r\n\r\n            \/\/ timing out without any events is fine. Continue receiving...\r\n            continue\r\n        } else if err != nil {\r\n            log.Printf(\"ERROR while processing partition %q: %s\", pc.PartitionID(), err)\r\n            break\r\n        }\r\n\r\n        if len(events) &gt; 0 {\r\n            if err := printEventsAsJSON(pc.PartitionID(), events); err != nil {\r\n                log.Printf(\"ERROR: failed when printing events: %s\", err)\r\n                break\r\n            }\r\n\r\n            latestEvent := events[len(events)-1]\r\n\r\n            log.Printf(\"[%s] Updating checkpoint with offset: %d, sequenceNumber: %d\", pc.PartitionID(), latestEvent.SequenceNumber, latestEvent.Offset)\r\n\r\n            if err := pc.UpdateCheckpoint(ctx, latestEvent, nil); err != nil {\r\n                log.Printf(\"ERROR: failed when updating checkpoint: %s\", err)\r\n            }\r\n        }\r\n    }\r\n}\r\n\r\nfunc printEventsAsJSON(partitionID string, events []*azeventhubs.ReceivedEventData) error {\r\n    for _, evt := range events {\r\n        var bodyBytes []int\r\n\r\n        for _, b := range evt.Body {\r\n            bodyBytes = append(bodyBytes, int(b))\r\n        }\r\n\r\n        \/\/ pick out some of the common fields\r\n        jsonBytes, err := json.Marshal(struct {\r\n            PartitionID    string\r\n            MessageID      any\r\n            BodyAsString   string\r\n            Body           []int\r\n            SequenceNumber int64\r\n            Offset         int64\r\n        }{partitionID, evt.MessageID, string(evt.Body), bodyBytes, evt.SequenceNumber, evt.Offset})\r\n\r\n        if err != nil {\r\n            return fmt.Errorf(\"Failed to marshal received event with message ID %v: %s\", evt.MessageID, err.Error())\r\n        }\r\n\r\n        fmt.Printf(\"%s\\n\", string(jsonBytes))\r\n    }\r\n\r\n    return nil\r\n}\r\n\r\nfunc main() {\r\n    if err := processorCmd(); err != nil {\r\n        fmt.Printf(\"ERROR: %s\\n\", err)\r\n        os.Exit(1)\r\n    }\r\n}<\/code><\/pre>\n<h3>ehlagcalc: Calculating consumption lag using the CheckpointStore<\/h3>\n<p>Consumers can sometimes fall behind when receiving events. We can get a rough estimate of how far behind we are by comparing the last checkpoint for a partition against the last enqueued sequence number.<\/p>\n<pre><code class=\"language-go\">\/\/ This tool queries metadata from Event Hubs and checks it against information stored in the checkpoint\r\n\/\/ store to calculate the \"lag\" between our Processors and the service. It's best used as a rough approximation\r\n\/\/ of state as the data sources are not necessarily in-sync when updates occur frequently.\r\n\r\npackage main\r\n\r\nimport (\r\n    \"context\"\r\n    \"errors\"\r\n    \"flag\"\r\n    \"fmt\"\r\n    \"os\"\r\n    \"sort\"\r\n    \"time\"\r\n\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/azidentity\"\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs\"\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs\/checkpoints\"\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/storage\/azblob\"\r\n)\r\n\r\nfunc main() {\r\n    if err := checkpointLagTool(os.Args[1:]); err != nil {\r\n        fmt.Fprintf(os.Stderr, \"ERROR: %s\\n\", err)\r\n        os.Exit(1)\r\n    }\r\n}\r\n\r\nfunc checkpointLagTool(commandLineArgs []string) error {\r\n    clients, err := createClients()\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    defer clients.ConsumerClient.Close(context.Background())\r\n\r\n    eventHubProps, err := clients.ConsumerClient.GetEventHubProperties(context.Background(), nil)\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    checkpoints, err := clients.CheckpointStore.ListCheckpoints(context.Background(), clients.EventHubNamespace, clients.EventHubName, clients.EventHubConsumerGroup, nil)\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    checkpointsMap := map[string]*azeventhubs.Checkpoint{}\r\n\r\n    for _, cp := range checkpoints {\r\n        cp := cp\r\n        checkpointsMap[cp.PartitionID] = &amp;cp\r\n    }\r\n    ownerships, err := clients.CheckpointStore.ListOwnership(context.Background(), clients.EventHubNamespace, clients.EventHubName, clients.EventHubConsumerGroup, nil)\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    ownersMap := map[string]*azeventhubs.Ownership{}\r\n\r\n    for _, o := range ownerships {\r\n        o := o\r\n        ownersMap[o.PartitionID] = &amp;o\r\n    }\r\n\r\n    sort.Strings(eventHubProps.PartitionIDs)\r\n\r\n    fmt.Fprintf(os.Stderr, \"WARNING: Excessive querying of the checkpoint store\/Event Hubs can impact application performance.\\n\")\r\n\r\n    for _, partID := range eventHubProps.PartitionIDs {\r\n        partID := partID\r\n\r\n        cp, o := checkpointsMap[partID], ownersMap[partID]\r\n\r\n        partProps, err := clients.ConsumerClient.GetPartitionProperties(context.Background(), partID, nil)\r\n\r\n        if err != nil {\r\n            return err\r\n        }\r\n\r\n        fmt.Printf(\"Partition ID %q\\n\", partID)\r\n\r\n        if o != nil {\r\n            fmt.Printf(\"  Owner ID: %q, last updated: %s\\n\", o.OwnerID, o.LastModifiedTime.Format(time.RFC3339))\r\n        } else {\r\n            fmt.Printf(\"  Owner ID: &lt;no owner&gt;\\n\")\r\n        }\r\n\r\n        fmt.Printf(\"  Last enqueued sequence number is %d\\n\", partProps.LastEnqueuedSequenceNumber)\r\n\r\n        if cp != nil &amp;&amp; cp.SequenceNumber != nil {\r\n            fmt.Printf(\"  Delta (between service and checkpoint): %d\\n\", partProps.LastEnqueuedSequenceNumber-*cp.SequenceNumber)\r\n        }\r\n    }\r\n\r\n    return nil\r\n}\r\n\r\ntype clients struct {\r\n    ConsumerClient  *azeventhubs.ConsumerClient\r\n    CheckpointStore *checkpoints.BlobStore\r\n\r\n    EventHubNamespace     string\r\n    EventHubName          string\r\n    EventHubConsumerGroup string\r\n}\r\n\r\nfunc createClients() (*clients, error) {\r\n    eventHubNamespace := flag.String(\"namespace\", \"\", \"The fully qualified hostname of your Event Hub namespace (ex: &lt;your event hub&gt;.servicebus.windows.net)\")\r\n    eventHubName := flag.String(\"eventhub\", \"\", \"The name of your Event Hub\")\r\n    eventHubConsumerGroup := flag.String(\"consumergroup\", azeventhubs.DefaultConsumerGroup, \"The Event Hub consumer group used by your application\")\r\n\r\n    storageAccountURL := flag.String(\"storageaccount\", \"\", \"The storage account URL used by your blob store (ex: https:\/\/&lt;storage account name&gt;.blob.core.windows.net\/)\")\r\n    storageContainerName := flag.String(\"container\", \"\", \"The storage container used by your checkpoints\")\r\n\r\n    flag.Parse()\r\n\r\n    if *eventHubNamespace == \"\" || *eventHubName == \"\" || *eventHubConsumerGroup == \"\" || *storageAccountURL == \"\" || *storageContainerName == \"\" {\r\n        flag.PrintDefaults()\r\n        return nil, errors.New(\"Missing command line arguments\")\r\n    }\r\n\r\n    defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)\r\n\r\n    if err != nil {\r\n        return nil, err\r\n    }\r\n\r\n    blobClient, err := azblob.NewClient(*storageAccountURL, defaultAzureCred, nil)\r\n\r\n    if err != nil {\r\n        return nil, err\r\n    }\r\n\r\n    blobStore, err := checkpoints.NewBlobStore(blobClient.ServiceClient().NewContainerClient(*storageContainerName), nil)\r\n\r\n    if err != nil {\r\n        return nil, err\r\n    }\r\n\r\n    \/\/ Both ProducerClient and ConsumerClient can query event hub partition properties.\r\n    cc, err := azeventhubs.NewConsumerClient(*eventHubNamespace, *eventHubName, *eventHubConsumerGroup, defaultAzureCred, nil)\r\n\r\n    if err != nil {\r\n        return nil, err\r\n    }\r\n\r\n    return &amp;clients{\r\n        ConsumerClient:        cc,\r\n        CheckpointStore:       blobStore,\r\n        EventHubNamespace:     *eventHubNamespace,\r\n        EventHubName:          *eventHubName,\r\n        EventHubConsumerGroup: *eventHubConsumerGroup,\r\n    }, nil\r\n}<\/code><\/pre>\n<h3>ehpartition: Consuming events from a single partition, no checkpointing<\/h3>\n<p>The <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#ConsumerClient\"><code>ConsumerClient<\/code><\/a> can be used to receive events from specific partitions. The <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#ConsumerClient\"><code>ConsumerClient<\/code><\/a> doesn&#8217;t do load balancing or checkpointing and is perfect for when you want to have absolute control over which partitions are read and where you want to start reading.<\/p>\n<pre><code class=\"language-go\">\/\/ This tool lets you consume events from a single partition using the ProducerClient.\r\n\/\/ The PartitionClient does not do checkpointing and can only consume from a single\r\n\/\/ partition at a time. Look at the \"ehprocessor\" tool, which uses the Processor.\r\n\r\npackage main\r\n\r\nimport (\r\n    \"context\"\r\n    \"encoding\/json\"\r\n    \"errors\"\r\n    \"flag\"\r\n    \"fmt\"\r\n    \"log\"\r\n    \"os\"\r\n    \"strconv\"\r\n    \"strings\"\r\n    \"time\"\r\n\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/azcore\/to\"\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/azidentity\"\r\n    \"github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs\"\r\n)\r\n\r\nfunc main() {\r\n    if err := partitionCmd(os.Args[:]); err != nil {\r\n        fmt.Printf(\"ERROR: %s\\n\", err)\r\n        os.Exit(1)\r\n    }\r\n}\r\n\r\nfunc printPartitionExamples() {\r\n    fmt.Fprintf(os.Stderr, \"\\n\"+\r\n        \"Examples for partition:\\n\"+\r\n        \"  # Consume from after the latest event on partition \\\"partitionid\\\"\\n\"+\r\n        \"  ehpartition -namespace &lt;your event hub namespace&gt;. + servicebus.windows.net -eventhub tests -partition \\\"partitionid\\\"\\n\"+\r\n        \"\\n\"+\r\n        \"  # Consume including the latest event on partition \\\"partitionid\\\"\\n\"+\r\n        \"  ehpartition -namespace &lt;your event hub namespace&gt;. + servicebus.windows.net -eventhub tests -partition \\\"partitionid\\\" -start \\\"@latest\\\" -inclusive\\n\"+\r\n        \"\\n\"+\r\n        \"  # Consume from the beginning of partition \\\"partitionid\\\"\\n\"+\r\n        \"  ehpartition -namespace &lt;your event hub namespace&gt;. + servicebus.windows.net -eventhub tests -partition \\\"partitionid\\\" -start \\\"@earliest\\\"\\n\")\r\n}\r\n\r\n\/\/ partitionCmd handles receiving from a single partition using a PartitionClient\r\nfunc partitionCmd(commandLineArgs []string) error {\r\n    fs := flag.NewFlagSet(\"partition\", flag.ContinueOnError)\r\n\r\n    eventHubNamespace := fs.String(\"namespace\", \"\", \"The fully qualified hostname of your Event Hub namespace (ex: &lt;your event hub&gt;.servicebus.windows.net)\")\r\n    eventHubName := fs.String(\"eventhub\", \"\", \"The name of your Event Hub\")\r\n    eventHubConsumerGroup := fs.String(\"consumergroup\", azeventhubs.DefaultConsumerGroup, \"The Event Hub consumer group used by your application\")\r\n    eventHubOwnerLevel := fs.Int64(\"ownerlevel\", -1, \"The owner level of your consumer\")\r\n    partitionID := fs.String(\"partition\", \"\", \"Partition ID to receive events from\")\r\n\r\n    startPositionStr := fs.String(\"start\", \"@latest\", \"Start position: @latest or @earliest or o:&lt;offset&gt; or s:&lt;sequence number&gt;\")\r\n    startInclusive := fs.Bool(\"inclusive\", false, \"Include the event pointed to by the start position\")\r\n\r\n    maxBatchWaitTime := fs.Duration(\"wait\", 30*time.Second, \"Max wait time for events, per batch\")\r\n    maxBatchSize := fs.Int(\"count\", 1, \"Maximum number of events to receive, per batch\")\r\n\r\n    if err := fs.Parse(commandLineArgs); err != nil {\r\n        printPartitionExamples()\r\n        return err\r\n    }\r\n\r\n    if *eventHubName == \"\" || *eventHubNamespace == \"\" || *eventHubConsumerGroup == \"\" || *partitionID == \"\" {\r\n        fs.PrintDefaults()\r\n        printPartitionExamples()\r\n        return errors.New(\"missing command line arguments\")\r\n    }\r\n\r\n    defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    startPosition, startPosDesc, err := calculateStartPosition(*startPositionStr, *startInclusive)\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    \/\/ Using an owner level lets you control exclusivity when consuming a partition.\r\n    \/\/\r\n    \/\/ See the PartitionClientOptions.OwnerLevel field for more details: https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#PartitionClientOptions\r\n    ownerLevelDesc := fmt.Sprintf(\"%d\", *eventHubOwnerLevel)\r\n\r\n    if *eventHubOwnerLevel == -1 {\r\n        eventHubOwnerLevel = nil\r\n        ownerLevelDesc = \"&lt;none&gt;\"\r\n    }\r\n\r\n    cc, err := azeventhubs.NewConsumerClient(*eventHubNamespace, *eventHubName, *eventHubConsumerGroup, defaultAzureCred, nil)\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    defer cc.Close(context.Background())\r\n\r\n    pc, err := cc.NewPartitionClient(*partitionID, &amp;azeventhubs.PartitionClientOptions{\r\n        StartPosition: startPosition,\r\n        OwnerLevel:    eventHubOwnerLevel,\r\n    })\r\n\r\n    if err != nil {\r\n        return err\r\n    }\r\n\r\n    log.Printf(\"Processing events from partition %s, %s, owner level: %s\", *partitionID, startPosDesc, ownerLevelDesc)\r\n    processPartition(context.Background(), pc, *partitionID, *maxBatchWaitTime, *maxBatchSize)\r\n    return nil\r\n}\r\n\r\nfunc calculateStartPosition(startPositionStr string, startInclusive bool) (azeventhubs.StartPosition, string, error) {\r\n    startPosition := azeventhubs.StartPosition{\r\n        Inclusive: startInclusive,\r\n    }\r\n\r\n    startPosDesc := fmt.Sprintf(\"Inclusive: %t\", startInclusive)\r\n\r\n    if strings.HasPrefix(startPositionStr, \"s:\") {\r\n        v, err := strconv.ParseInt((startPositionStr)[2:], 10, 64)\r\n\r\n        if err != nil {\r\n            return azeventhubs.StartPosition{}, \"\", fmt.Errorf(\"'%s' is an invalid start position\", startPositionStr)\r\n        }\r\n\r\n        startPosDesc = fmt.Sprintf(\"sequence number: %d, %s\", v, startPosDesc)\r\n        startPosition.SequenceNumber = &amp;v\r\n    } else if strings.HasPrefix(startPositionStr, \"o:\") {\r\n        v, err := strconv.ParseInt((startPositionStr)[2:], 10, 64)\r\n\r\n        if err != nil {\r\n            return azeventhubs.StartPosition{}, \"\", fmt.Errorf(\"'%s' is an invalid start position\", startPositionStr)\r\n        }\r\n\r\n        startPosDesc = fmt.Sprintf(\"offset: %d, %s\", v, startPosDesc)\r\n        startPosition.Offset = &amp;v\r\n    } else if startPositionStr == \"@earliest\" {\r\n        startPosDesc = \"earliest, \" + startPosDesc\r\n        startPosition.Earliest = to.Ptr(true)\r\n    } else if startPositionStr == \"@latest\" {\r\n        startPosDesc = \"latest, \" + startPosDesc\r\n        startPosition.Latest = to.Ptr(true)\r\n    } else {\r\n        return azeventhubs.StartPosition{}, \"\", fmt.Errorf(\"'%s' is an invalid start position\", startPositionStr)\r\n    }\r\n    return startPosition, startPosDesc, nil\r\n}\r\n\r\nfunc processPartition(ctx context.Context, pc *azeventhubs.PartitionClient, partitionID string, eventHubMaxTime time.Duration, eventHubMaxSize int) {\r\n    defer pc.Close(ctx)\r\n\r\n    for {\r\n        receiveCtx, cancelReceive := context.WithTimeout(ctx, eventHubMaxTime)\r\n        events, err := pc.ReceiveEvents(receiveCtx, eventHubMaxSize, nil)\r\n        cancelReceive()\r\n\r\n        if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {\r\n            if ctx.Err() != nil { \/\/ parent cancelled\r\n                break\r\n            }\r\n\r\n            \/\/ timing out without any events is fine. Continue receiving...\r\n            continue\r\n        } else if err != nil {\r\n            log.Printf(\"ERROR while processing partition %q: %s\", partitionID, err)\r\n            break\r\n        }\r\n\r\n        if err := printEventsAsJSON(partitionID, events); err != nil {\r\n            log.Printf(\"ERROR: %s\", err)\r\n            break\r\n        }\r\n    }\r\n}\r\n\r\nfunc printEventsAsJSON(partitionID string, events []*azeventhubs.ReceivedEventData) error {\r\n    for _, evt := range events {\r\n        var bodyBytes []int\r\n\r\n        for _, b := range evt.Body {\r\n            bodyBytes = append(bodyBytes, int(b))\r\n        }\r\n\r\n        \/\/ pick out some of the common fields\r\n        jsonBytes, err := json.Marshal(struct {\r\n            PartitionID    string\r\n            MessageID      any\r\n            BodyAsString   string\r\n            Body           []int\r\n            SequenceNumber int64\r\n            Offset         int64\r\n        }{partitionID, evt.MessageID, string(evt.Body), bodyBytes, evt.SequenceNumber, evt.Offset})\r\n\r\n        if err != nil {\r\n            return fmt.Errorf(\"Failed to marshal received event with message ID %v: %s\", evt.MessageID, err.Error())\r\n        }\r\n\r\n        fmt.Printf(\"%s\\n\", string(jsonBytes))\r\n    }\r\n\r\n    return nil\r\n}<\/code><\/pre>\n<h2>Summary<\/h2>\n<p>The Azure Event Hubs for Go library is the perfect complement to applications that want to take advantage of the high throughput and scalability of Event Hubs in Go programs.<\/p>\n<p>To learn more about the azeventhubs package, see our <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#section-readme\">documentation<\/a>. You can also find more examples either on <a href=\"https:\/\/pkg.go.dev\/github.com\/Azure\/azure-sdk-for-go\/sdk\/messaging\/azeventhubs#pkg-examples\">pkg.go.dev<\/a> or in our <a href=\"https:\/\/github.com\/Azure\/azure-sdk-for-go\/tree\/main\/sdk\/messaging\/azeventhubs\">GitHub repository<\/a>.<\/p>\n<h2>Feedback<\/h2>\n<p>We&#8217;d love to hear about your experiences using the Azure SDK for Go. Send us your feedback on our <a href=\"https:\/\/gophers.slack.com\/archives\/CA7HK8EEP\">Slack Channel<\/a> or at the <a href=\"https:\/\/discordapp.com\/channels\/723347736853741589\/933781546815606885\">#golang-friends<\/a> channel on the Microsoft Open Source Discord Server.<\/p>\n<p>For feature requests, bug reports, or general support, <a href=\"https:\/\/github.com\/Azure\/azure-sdk-for-go\/issues\/new\/choose\">open an issue<\/a> in the Azure SDK for Go repository on GitHub. For more information on how we triage issues, see the <a href=\"https:\/\/devblogs.microsoft.com\/azure-sdk\/github-issue-support-process\/\">Azure SDK GitHub Issue Support Process<\/a>.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Announcing the stable release of the Azure Event Hubs client library for Go<\/p>\n","protected":false},"author":45459,"featured_media":2695,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[1],"tags":[910,909,797,810,811,24],"class_list":["post-2684","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-azure-sdk","tag-azeventhubs","tag-azure-event-hubs-go","tag-eventhubs","tag-go","tag-golang","tag-releases"],"acf":[],"blog_post_summary":"<p>Announcing the stable release of the Azure Event Hubs client library for Go<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/posts\/2684","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/users\/45459"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/comments?post=2684"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/posts\/2684\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/media\/2695"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/media?parent=2684"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/categories?post=2684"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/azure-sdk\/wp-json\/wp\/v2\/tags?post=2684"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}