Inviare e ricevere messaggi da code di bus di servizio di Azure (Go)

In questa esercitazione si apprenderà come inviare e ricevere messaggi da bus di servizio di Azure code usando il linguaggio di programmazione Go.

bus di servizio di Azure è un broker di messaggi aziendale completamente gestito con code di messaggi e funzionalità di pubblicazione/sottoscrizione. Il bus di servizio viene usato per separare applicazioni e servizi l'uno dall'altro, fornendo un trasporto di messaggi distribuito, affidabile e ad alte prestazioni.

Il pacchetto azservicebus di Azure SDK per Go consente di inviare e ricevere messaggi da bus di servizio di Azure e di usare il linguaggio di programmazione Go.

Al termine di questa esercitazione, sarà possibile inviare un singolo messaggio o batch di messaggi a una coda, ricevere messaggi e messaggi non recapitabili non elaborati.

Prerequisiti

Creare l'app di esempio

Per iniziare, creare un nuovo modulo Go.

  1. Creare una nuova directory per il modulo denominato service-bus-go-how-to-use-queues.

  2. azservicebus Nella directory inizializzare il modulo e installare i pacchetti necessari.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. Creare un file denominato main.go.

Autenticare e creare un client

main.go Nel file creare una nuova funzione denominata GetClient e aggiungere il codice seguente:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

La GetClient funzione restituisce un nuovo azservicebus.Client oggetto creato usando uno spazio dei nomi bus di servizio di Azure e una credenziale. Lo spazio dei nomi viene fornito dalla AZURE_SERVICEBUS_HOSTNAME variabile di ambiente. E le credenziali vengono create usando la azidentity.NewDefaultAzureCredential funzione .

Per lo sviluppo locale, è DefaultAzureCredential stato usato il token di accesso dall'interfaccia della riga di comando di Azure, che può essere creato eseguendo il az login comando per l'autenticazione in Azure.

Suggerimento

Per eseguire l'autenticazione con una stringa di connessione, usare la funzione NewClientFromConnectionString .

Inviare messaggi a una coda

main.go Nel file creare una nuova funzione denominata SendMessage e aggiungere il codice seguente:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage accetta due parametri: una stringa di messaggio e un azservicebus.Client oggetto . Crea quindi un nuovo azservicebus.Sender oggetto e invia il messaggio alla coda. Per inviare messaggi in blocco, aggiungere la SendMessageBatch funzione al main.go file.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch accetta due parametri: una sezione di messaggi e un azservicebus.Client oggetto . Crea quindi un nuovo azservicebus.Sender oggetto e invia i messaggi alla coda.

Ricevere messaggi da una coda

Dopo aver inviato messaggi alla coda, è possibile riceverli con il azservicebus.Receiver tipo . Per ricevere messaggi da una coda, aggiungere la GetMessage funzione al main.go file.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage accetta un azservicebus.Client oggetto e crea un nuovo azservicebus.Receiver oggetto. Riceve quindi i messaggi dalla coda. La Receiver.ReceiveMessages funzione accetta due parametri: un contesto e il numero di messaggi da ricevere. La Receiver.ReceiveMessages funzione restituisce una sezione di azservicebus.ReceivedMessage oggetti.

Successivamente, un for ciclo scorre i messaggi e stampa il corpo del messaggio. Viene quindi chiamata la CompleteMessage funzione per completare il messaggio, rimuovendolo dalla coda.

I messaggi che superano i limiti di lunghezza, vengono inviati a una coda non valida o non vengono elaborati correttamente possono essere inviati alla coda dei messaggi non recapitabili. Per inviare messaggi alla coda dei messaggi non recapitabili, aggiungere la SendDeadLetterMessage funzione al main.go file.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage accetta un azservicebus.Client oggetto e un azservicebus.ReceivedMessage oggetto . Invia quindi il messaggio alla coda dei messaggi non recapitabili. La funzione accetta due parametri: un contesto e un azservicebus.DeadLetterOptions oggetto . La Receiver.DeadLetterMessage funzione restituisce un errore se il messaggio non riesce a essere inviato alla coda dei messaggi non recapitabili.

Per ricevere messaggi dalla coda dei messaggi non recapitabili, aggiungere la ReceiveDeadLetterMessage funzione al main.go file.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage accetta un azservicebus.Client oggetto e crea un nuovo azservicebus.Receiver oggetto con opzioni per la coda dei messaggi non recapitabili. Riceve quindi i messaggi dalla coda dei messaggi non recapitabili. La funzione riceve quindi un messaggio dalla coda dei messaggi non recapitabili. Stampa quindi il motivo e la descrizione dei messaggi non recapitabili per tale messaggio.

Codice di esempio

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

Eseguire il codice

Prima di eseguire il codice, creare una variabile di ambiente denominata AZURE_SERVICEBUS_HOSTNAME. Impostare il valore della variabile di ambiente sullo spazio dei nomi del bus di servizio.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

Eseguire quindi il comando seguente go run per eseguire l'app:

go run main.go

Passaggi successivi

Per altre informazioni, vedere i collegamenti seguenti: