July 22nd, 2022

Announcing the stable release of the Azure Service Bus client library for Go

Richard Park
Developer

We’re excited to announce the stable release of the Azure Service Bus client library for Go. Azure Service Bus is an enterprise cloud messaging service, enabling the building of scalable cloud solutions.

NOTE: If you’re using the previous Azure Service Bus library for Go and would like to upgrade, we’ve put together a migration guide that you can use to port your application over.

Install the package

The Azure Service Bus library can be installed using go get:

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus

# Optionally, if you also want to use Azure Identity for authentication
go get github.com/Azure/azure-sdk-for-go/sdk/azidentity

This blog post assumes you have a working development environment for Go 1.18 or above, and that you have an Azure Service Bus namespace.

For instructions on creating an Azure Service Bus namespace, follow this step-by-step guide.

Creating the Client

The Service Bus Client is used to create Senders and Receivers. Senders and Receivers allow you to send messages to Queues and Topics and receive messages from Queues and Subscriptions, respectively.

You can create a Client using a Service Bus connection string (obtained via the Azure portal) or with a TokenCredential (for example, DefaultAzureCredential) from the Azure Identity library.

Using the DefaultAzureCredential token credential

The DefaultAzureCredential combines several credential types into one easy-to-use type. It can authenticate using the Azure CLI, managed identities, and more. For more information, see the azidentity documentation.

import (
    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
    // For more information about the DefaultAzureCredential:
    // https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#NewDefaultAzureCredential
    tokenCredential, err := azidentity.NewDefaultAzureCredential(nil)

    if err != nil {
        panic(err)
    }

    client, err := azservicebus.NewClient("<ex: my-service-bus.servicebus.windows.net>", tokenCredential, nil)

    if err != nil {
        panic(err)
    }

    defer client.Close(context.TODO())
}

Using a Service Bus connection string

Azure Service Bus also supports authentication using a connection string, which you can get from the portal. Connection strings can be scoped to the namespace or to individual queues or topics.

For instructions on getting a connection string, see the documentation here: link.

import (
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
    client, err := azservicebus.NewClientFromConnectionString("<Service Bus connection string>", nil)

    if err != nil {
        panic(err)
    }

    defer client.Close(context.TODO())
}

Sending messages

Using a Sender, you can send Messages.

Message can be used to send any data that can be converted into a slice of bytes using the Body field. It also contains fields like Subject or ApplicationProperties, which can be used to add metadata to the message.

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
    // NOTE: See the "Using the `DefaultAzureCredential`" section above to use an Azure
    // Identity credential instead of a connection string.
    client, err := azservicebus.NewClientFromConnectionString("<Service Bus connection string>", nil)

    if err != nil {
        panic(err)
    }

    defer client.Close(context.TODO())

    sender, err := client.NewSender("<queue or topic>", nil)

    if err != nil {
        panic(err)
    }

    defer sender.Close(context.TODO())

    subject := "greetings"

    // send a single message
    err = sender.SendMessage(context.TODO(), &azservicebus.Message{
        Body:    []byte("hello world!"),
        Subject: &subject,
        ApplicationProperties: map[string]interface{}{
            "GreetingType": "jovial",
        },
    })

    if err != nil {
        panic(err)
    }
}

For improved efficiency, you can also send multiple messages in a single batch:

package main

import (
    "context"
    "errors"
    "fmt"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
    // NOTE: See the "Using the `DefaultAzureCredential`" section above to use an Azure
    // Identity credential instead of a connection string.
    client, err := azservicebus.NewClientFromConnectionString("<Service Bus connection string>", nil)

    if err != nil {
        panic(err)
    }

    defer client.Close(context.TODO())

    sender, err := client.NewSender("<queue or topic>", nil)

    if err != nil {
        panic(err)
    }

    defer sender.Close(context.TODO())

    // or send multiple messages in a single batch
    batch, err := sender.NewMessageBatch(context.TODO(), nil)

    if err != nil {
        panic(err)
    }

    messagesToAdd := []*azservicebus.Message{
        {
            Body: []byte("hello world"),
            ApplicationProperties: map[string]interface{}{
                "GreetingType": "jovial",
            },
        },
        {
            Body: []byte("hello, again"),
            ApplicationProperties: map[string]interface{}{
                "GreetingType": "jovial",
            },
        },
    }

    fmt.Printf("Adding messages to batches...\n")

    var batchesToSend []*azservicebus.MessageBatch

    for i := 0; i < len(messagesToAdd); i++ {
        err := batch.AddMessage(messagesToAdd[i], nil)

        if errors.Is(err, azservicebus.ErrMessageTooLarge) {
            if batch.NumMessages() == 0 {
                fmt.Printf("Message is too large to fit into a batch. Will need to be resized/split.\n")
                panic(err)
            }

            fmt.Printf("Current batch is full, contains %d bytes, %d message(s)\n", batch.NumBytes(), batch.NumMessages())
            batchesToSend = append(batchesToSend, batch)

            newBatch, err := sender.NewMessageBatch(context.TODO(), nil)

            if err != nil {
                panic(err)
            }

            fmt.Printf("New batch created, resuming adding messages\n")
            batch = newBatch

            // retry adding this message to the batch.
            i--
        } else if err != nil {
            panic(err)
        }
    }

    if batch.NumMessages() > 0 {
        fmt.Printf("Adding final batch, contains %d bytes, %d message(s)\n", batch.NumBytes(), batch.NumMessages())
        batchesToSend = append(batchesToSend, batch)
    }

    for _, batch := range batchesToSend {
        fmt.Printf("Sending batch #%d...\n", i)
        if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
            panic(err)
        }
    }
}

Receiving messages using the Receiver

The Receiver:

package main

import (
    "context"
    "errors"
    "fmt"
    "sync"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
    // NOTE: See the "Using the `DefaultAzureCredential`" section above to use an Azure
    // Identity credential instead of a connection string.
    client, err := azservicebus.NewClientFromConnectionString("<Service Bus connection string>", nil)

    if err != nil {
        panic(err)
    }

    defer client.Close(context.TODO())

    receiver, err := client.NewReceiverForQueue(
        "<queue>",
        nil)
    // or, if you want to connect to a subscription for a topic:
    // client.NewReceiverForSubscription("<topic>", "<subscription>", nil)

    if err != nil {
        panic(err)
    }

    defer receiver.Close(context.TODO())

    // this loop will receive up to 10 messages each iteration and
    // process and complete them in parallel.
    for {
        messages, err := receiver.ReceiveMessages(context.TODO(), 10, nil)

        if err != nil {
            panic(err)
        }

        wg := sync.WaitGroup{}

        for _, message := range messages {
            wg.Add(1)

            go func(m *azservicebus.ReceivedMessage) {
                defer wg.Done()

                // use the message

                greetingType, ok := m.ApplicationProperties["GreetingType"].(string)

                if !ok {
                    // property isn't a string, or wasn't in the map
                    panic(errors.New("GreetingType application property wasn't present or wasn't a string"))
                }

                // The .Body of the message is just bytes - for our example we'll just assume
                // it's the bytes of a string.
                fmt.Printf("Message received with contents '%s', greeting type '%s'\n", string(m.Body), greetingType)

                // For more information about settling messages:
                // https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
                if err := receiver.CompleteMessage(context.TODO(), m, nil); err != nil {
                    panic(err)
                }
            }(message)
        }

        wg.Wait()
    }
}

For more information about the Receiver, see the API documentation and examples.

Summary

We hope this post offered some insight into the Azure Service Bus package. We welcome any feedback or suggestions at the azure-sdk-for-go GitHub repository.

We have more examples than what we’ve highlighted here in the blog post. You can view them through pkg.go.dev (link) or in GitHub (link).

Author

Richard Park
Developer

Developer on the Azure SDK, currently working on Go!

0 comments

Discussion are closed.