Azure Service Bus kuyruklarına ileti gönderme ve bu kuyruklardan ileti alma (Go)
Bu öğreticide Go programlama dilini kullanarak Azure Service Bus kuyruklarına ileti göndermeyi ve bu kuyruklardan ileti almayı öğreneceksiniz.
Azure Service Bus, ileti kuyrukları ve yayımlama/abone olma özelliklerine sahip tam olarak yönetilen bir kurumsal ileti aracısıdır. Service Bus, uygulamaları ve hizmetleri birbirinden ayırarak dağıtılmış, güvenilir ve yüksek performanslı bir ileti aktarımı sağlamak için kullanılır.
Go için Azure SDK'nın azservicebus paketi, Azure Service Bus ve Go programlama dilini kullanarak ileti gönderip almanıza olanak tanır.
Bu öğreticinin sonunda şunları yapabileceksiniz: kuyruğa tek bir ileti veya toplu ileti gönderme, iletileri alma ve işlenmemiş teslim edilemeyen iletiler.
Önkoşullar
- Azure aboneliği. Visual Studio veya MSDN abone avantajlarınızı etkinleştirebilir veya ücretsiz bir hesaba kaydolabilirsiniz.
- Çalışmak için bir kuyruğunuz yoksa, kuyruk oluşturmak için Service Bus kuyruğu oluşturmak için Azure portal kullanma makalesindeki adımları izleyin.
- Go sürüm 1.18 veya üzeri
Örnek uygulamayı oluşturma
Başlamak için yeni bir Go modülü oluşturun.
adlı modül
service-bus-go-how-to-use-queues
için yeni bir dizin oluşturun.dizininde
azservicebus
modülü başlatın ve gerekli paketleri yükleyin.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
main.go
adlı yeni bir dosya oluşturun.
Kimlik doğrulaması yapma ve istemci oluşturma
main.go
dosyasında adlı GetClient
yeni bir işlev oluşturun ve aşağıdaki kodu ekleyin:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
işlevi, GetClient
Azure Service Bus ad alanı ve kimlik bilgisi kullanılarak oluşturulan yeni azservicebus.Client
bir nesne döndürür. Ad alanı ortam değişkeni tarafından AZURE_SERVICEBUS_HOSTNAME
sağlanır. Kimlik bilgisi işlevi kullanılarak azidentity.NewDefaultAzureCredential
oluşturulur.
Yerel geliştirme için Azure DefaultAzureCredential
CLI'dan erişim belirteci kullanılmıştır. Bu belirteç, Azure'da kimlik doğrulaması yapmak için komutu çalıştırılarak az login
oluşturulabilir.
İpucu
Bağlantı dizesiyle kimlik doğrulaması yapmak için NewClientFromConnectionString işlevini kullanın.
Kuyruğa ileti gönderme
main.go
dosyasında adlı SendMessage
yeni bir işlev oluşturun ve aşağıdaki kodu ekleyin:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
iki parametre alır: bir ileti dizesi ve bir azservicebus.Client
nesne. Ardından yeni azservicebus.Sender
bir nesne oluşturur ve iletiyi kuyruğa gönderir. Toplu ileti göndermek için işlevini dosyanıza main.go
ekleyinSendMessageBatch
.
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
iki parametre alır: bir ileti dilimi ve bir azservicebus.Client
nesne. Ardından yeni azservicebus.Sender
bir nesne oluşturur ve iletileri kuyruğa gönderir.
Kuyruktan ileti alma
Kuyruğa ileti gönderdikten sonra, bu iletileri türüyle azservicebus.Receiver
alabilirsiniz. Kuyruktan ileti almak için işlevini dosyanıza main.go
ekleyinGetMessage
.
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
bir azservicebus.Client
nesnesi alır ve yeni azservicebus.Receiver
bir nesne oluşturur. Ardından kuyruktan iletileri alır.
Receiver.ReceiveMessages
İşlev iki parametre alır: bağlam ve alınacak ileti sayısı.
Receiver.ReceiveMessages
işlevi bir nesne dilimi azservicebus.ReceivedMessage
döndürür.
Ardından, bir for
döngü iletiler arasında yinelenir ve ileti gövdesini yazdırır.
CompleteMessage
Ardından, iletiyi kuyruktan kaldırarak tamamlamak için işlevi çağrılır.
Uzunluk sınırlarını aşan, geçersiz bir kuyruğa gönderilen veya başarıyla işlenmemiş iletiler teslim edilemeyen ileti kuyruğuna gönderilebilir. İletileri teslim edilemeyen ileti kuyruğuna göndermek için işlevi dosyanıza main.go
ekleyinSendDeadLetterMessage
.
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
bir azservicebus.Client
nesnesi ve bir azservicebus.ReceivedMessage
nesnesi alır. Ardından iletiyi teslim edilemeyen ileti kuyruğuna gönderir. İşlev iki parametre alır: bağlam ve azservicebus.DeadLetterOptions
nesne. İleti Receiver.DeadLetterMessage
teslim edilemeyen ileti kuyruğuna gönderilemiyorsa işlev bir hata döndürür.
Teslim edilemeyen ileti kuyruğundan ileti almak için işlevini dosyanıza main.go
ekleyinReceiveDeadLetterMessage
.
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
bir azservicebus.Client
nesnesi alır ve teslim edilemeyen ileti kuyruğu seçenekleriyle yeni azservicebus.Receiver
bir nesne oluşturur. Ardından iletileri teslim edilemeyen ileti kuyruğundan alır. Ardından işlev, teslim edilemeyen ileti kuyruğundan bir ileti alır. Ardından bu iletinin ölü harfinin nedenini ve açıklamasını yazdırır.
Örnek kod
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
Kodu çalıştırma
Kodu çalıştırmadan önce adlı AZURE_SERVICEBUS_HOSTNAME
bir ortam değişkeni oluşturun. Ortam değişkeninin değerini Service Bus ad alanına ayarlayın.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
Ardından, uygulamayı çalıştırmak için aşağıdaki go run
komutu çalıştırın:
go run main.go
Sonraki adımlar
Daha fazla bilgi için aşağıdaki bağlantıları gözden geçirin: