Отправка сообщений в очереди Служебной шины Azure и получение сообщений из них (Go)

Из этого руководства вы узнаете, как с помощью языка Go отправлять сообщения в очереди Служебной шины Azure и получать сообщения из них.

Служебная шина Azure — это полностью управляемый брокер сообщений корпоративного типа с поддержкой очередей сообщений и возможностей публикации/подписки. Служебная шина используется для отделения приложений и служб друг от друга с обеспечением распределенной, надежной и высокопроизводительной транспортировки сообщений.

Пакет azservicebus Azure SDK для Go позволяет отправлять и получать сообщения из Служебной шины Azure и использовать язык программирования Go.

По завершении работы с этим руководством вы сможете отправлять отдельные сообщения или пакеты сообщений в очередь, получать сообщения и уведомления о недоставленных сообщениях, которые не обрабатываются.

Предварительные требования

Создание примера приложения

Для начала создайте новый модуль Go.

  1. Создайте для этого модуля новый каталог с именем service-bus-go-how-to-use-queues.

  2. В каталоге azservicebus инициализируйте модуль и установите необходимые пакеты.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. Создайте файл с именем main.go.

Аутентификация и создание клиента

В файле main.go создайте новую функцию с именем GetClient и добавьте следующий код:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

Функция GetClient возвращает новый объект azservicebus.Client, созданный с помощью пространства имен Служебная шина Azure и учетных данных. Пространство имен предоставляется переменной среды AZURE_SERVICEBUS_HOSTNAME. Учетные данные создаются с помощью функции azidentity.NewDefaultAzureCredential.

Для локальной разработки функция DefaultAzureCredential использовала маркер доступа из Azure CLI, который можно создать, выполнив команду az login для проверки подлинности в Azure.

Совет

Для проверки подлинности с помощью строки подключения используйте функцию NewClientFromConnectionString.

Отправка сообщений в очередь

В файле main.go создайте новую функцию с именем SendMessage и добавьте следующий код:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage принимает два параметра: строку сообщения и объект azservicebus.Client. Затем он создает новый объект azservicebus.Sender и отправляет сообщение в очередь. Чтобы отправить массив сообщений, добавьте функцию SendMessageBatch в файл main.go.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch принимает два параметра: срез сообщений и объект azservicebus.Client. Затем он создает новый объект azservicebus.Sender и отправляет сообщения в очередь.

Получение сообщений из очереди

После отправки сообщений в очередь их можно получить с типом azservicebus.Receiver. Чтобы получать сообщения из очереди, добавьте функцию GetMessage в файл main.go.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage принимает объект azservicebus.Client и создает новый объект azservicebus.Receiver. Затем он получает сообщения из очереди. Функция Receiver.ReceiveMessages принимает два параметра: контекст и количество получаемых сообщений. Функция Receiver.ReceiveMessages возвращает срез объектов azservicebus.ReceivedMessage.

Затем цикл for выполняет итерации сообщений и выводит текст сообщения. После этого функция CompleteMessage вызывается для завершения сообщения, удаляя его из очереди.

Сообщения, которые превышают ограничение по длине, отправлены в недопустимую очередь или не были обработаны, ставятся в очередь недоставленных сообщений. Чтобы поставить сообщения в очередь недоставленных сообщений, добавьте функцию SendDeadLetterMessage в файл main.go.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage принимает объект azservicebus.Client и объект azservicebus.ReceivedMessage. После этого сообщение направляется в очередь недоставленных сообщений. Функция принимает два параметра: контекст и объект azservicebus.DeadLetterOptions. Функция Receiver.DeadLetterMessage возвращает ошибку, если сообщение не отправляется в очередь недоставленных сообщений.

Чтобы получить сообщения из очереди недоставленных сообщений, добавьте функцию ReceiveDeadLetterMessage в файл main.go.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage принимает объект azservicebus.Client и создает новый объект azservicebus.Receiver с параметрами для очереди недоставленных сообщений. Затем она получает сообщения из очереди недоставленных сообщений. Затем функция получает одно сообщение из очереди недоставленных сообщений. После этого она выводит причину сбоя передачи сообщения и его описание.

Пример кода

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

Выполнение кода

Перед запуском кода создайте переменную среды с именем AZURE_SERVICEBUS_HOSTNAME. Задайте для переменной среды значение пространства имен Служебной шины Azure.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

Выполните следующую команду go run, чтобы запустить приложение:

go run main.go

Дальнейшие действия

Дополнительные сведения приведены в следующих статьях: