Отправка сообщений в очереди Служебной шины Azure и получение сообщений из них (Go)
Из этого руководства вы узнаете, как с помощью языка Go отправлять сообщения в очереди Служебной шины Azure и получать сообщения из них.
Служебная шина Azure — это полностью управляемый брокер сообщений корпоративного типа с поддержкой очередей сообщений и возможностей публикации/подписки. Служебная шина используется для отделения приложений и служб друг от друга с обеспечением распределенной, надежной и высокопроизводительной транспортировки сообщений.
Пакет azservicebus Azure SDK для Go позволяет отправлять и получать сообщения из Служебной шины Azure и использовать язык программирования Go.
По завершении работы с этим руководством вы сможете отправлять отдельные сообщения или пакеты сообщений в очередь, получать сообщения и уведомления о недоставленных сообщениях, которые не обрабатываются.
Предварительные требования
- Подписка Azure. Вы можете активировать преимущества подписчика Visual Studio или MSDN или зарегистрироваться для получения бесплатной учетной записи.
- Если у вас нет подходящей очереди служебной шины, создайте ее с помощью портала Azure.
- Go версии 1.18 или более поздней версии
Создание примера приложения
Для начала создайте новый модуль Go.
Создайте для этого модуля новый каталог с именем
service-bus-go-how-to-use-queues
.В каталоге
azservicebus
инициализируйте модуль и установите необходимые пакеты.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
Создайте файл с именем
main.go
.
Аутентификация и создание клиента
В файле main.go
создайте новую функцию с именем GetClient
и добавьте следующий код:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
Функция GetClient
возвращает новый объект azservicebus.Client
, созданный с помощью пространства имен Служебная шина Azure и учетных данных. Пространство имен предоставляется переменной среды AZURE_SERVICEBUS_HOSTNAME
. Учетные данные создаются с помощью функции azidentity.NewDefaultAzureCredential
.
Для локальной разработки функция DefaultAzureCredential
использовала маркер доступа из Azure CLI, который можно создать, выполнив команду az login
для проверки подлинности в Azure.
Совет
Для проверки подлинности с помощью строки подключения используйте функцию NewClientFromConnectionString.
Отправка сообщений в очередь
В файле main.go
создайте новую функцию с именем SendMessage
и добавьте следующий код:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
принимает два параметра: строку сообщения и объект azservicebus.Client
. Затем он создает новый объект azservicebus.Sender
и отправляет сообщение в очередь. Чтобы отправить массив сообщений, добавьте функцию SendMessageBatch
в файл main.go
.
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
принимает два параметра: срез сообщений и объект azservicebus.Client
. Затем он создает новый объект azservicebus.Sender
и отправляет сообщения в очередь.
Получение сообщений из очереди
После отправки сообщений в очередь их можно получить с типом azservicebus.Receiver
. Чтобы получать сообщения из очереди, добавьте функцию GetMessage
в файл main.go
.
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
принимает объект azservicebus.Client
и создает новый объект azservicebus.Receiver
. Затем он получает сообщения из очереди. Функция Receiver.ReceiveMessages
принимает два параметра: контекст и количество получаемых сообщений. Функция Receiver.ReceiveMessages
возвращает срез объектов azservicebus.ReceivedMessage
.
Затем цикл for
выполняет итерации сообщений и выводит текст сообщения. После этого функция CompleteMessage
вызывается для завершения сообщения, удаляя его из очереди.
Сообщения, которые превышают ограничение по длине, отправлены в недопустимую очередь или не были обработаны, ставятся в очередь недоставленных сообщений. Чтобы поставить сообщения в очередь недоставленных сообщений, добавьте функцию SendDeadLetterMessage
в файл main.go
.
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
принимает объект azservicebus.Client
и объект azservicebus.ReceivedMessage
. После этого сообщение направляется в очередь недоставленных сообщений. Функция принимает два параметра: контекст и объект azservicebus.DeadLetterOptions
. Функция Receiver.DeadLetterMessage
возвращает ошибку, если сообщение не отправляется в очередь недоставленных сообщений.
Чтобы получить сообщения из очереди недоставленных сообщений, добавьте функцию ReceiveDeadLetterMessage
в файл main.go
.
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
принимает объект azservicebus.Client
и создает новый объект azservicebus.Receiver
с параметрами для очереди недоставленных сообщений. Затем она получает сообщения из очереди недоставленных сообщений. Затем функция получает одно сообщение из очереди недоставленных сообщений. После этого она выводит причину сбоя передачи сообщения и его описание.
Пример кода
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
Выполнение кода
Перед запуском кода создайте переменную среды с именем AZURE_SERVICEBUS_HOSTNAME
. Задайте для переменной среды значение пространства имен Служебной шины Azure.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
Выполните следующую команду go run
, чтобы запустить приложение:
go run main.go
Дальнейшие действия
Дополнительные сведения приведены в следующих статьях: