Snabbstart: Skicka händelser till eller ta emot händelser från Event Hubs med Go
Azure Event Hubs är en strömningstjänst för stordata och händelseinmatningstjänst som kan ta emot och bearbeta flera miljoner händelser per sekund. Event Hubs kan bearbeta och lagra händelser, data eller telemetri som producerats av distribuerade program och enheter. Data som skickas till en händelsehubb kan omvandlas och lagras med valfri provider för realtidsanalys eller batchbearbetnings-/lagringsadapter. En detaljerad översikt över Event Hubs finns i Översikt över Event Hubs och Event Hubs-funktioner.
Den här snabbstarten beskriver hur du skriver Go-program för att skicka händelser till eller ta emot händelser från en händelsehubb.
Kommentar
Den här snabbstarten baseras på exempel på GitHub på https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Avsnittet Skicka händelser baseras på exemplet example_producing_events_test.go och den mottagna baseras på exemplet example_processor_test.go . Koden förenklas för snabbstarten och alla detaljerade kommentarer tas bort, så titta på exemplen för mer information och förklaringar.
Förutsättningar
För att slutföra den här snabbstarten, behöver du följande förhandskrav:
- Gå installerad lokalt. Följ dessa instruktioner om det behövs.
- Ett aktivt Azure-konto. Om du inte har någon Azure-prenumeration skapar du ett kostnadsfritt konto innan du börjar.
- Skapa ett Event Hubs-namnområde och en händelsehubb. Använd Azure-portalen för att skapa ett namnområde av typen Event Hubs och hämta de autentiseringsuppgifter för hantering som programmet behöver för att kommunicera med händelsehubben. Om du behöver skapa ett namnområde och en händelsehubb följer du anvisningarna i den här artikeln.
Skicka händelser
Det här avsnittet visar hur du skapar ett Go-program för att skicka händelser till en händelsehubb.
Installera Go-paketet
Hämta Go-paketet för Event Hubs enligt följande exempel.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Kod för att skicka händelser till en händelsehubb
Här är koden för att skicka händelser till en händelsehubb. De viktigaste stegen i koden är:
- Skapa en Event Hubs-producentklient med hjälp av en anslutningssträng till Event Hubs-namnområdet och händelsehubbens namn.
- Skapa ett batchobjekt och lägg till exempelhändelser i batchen.
- Skicka batchen med händelser till händelserna.
Viktigt!
Ersätt NAMESPACE CONNECTION STRING
med anslutningssträng till Event Hubs-namnområdet och EVENT HUB NAME
med namnet på händelsehubben i exempelkoden.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Kör inte programmet än. Du måste först köra mottagarappen och sedan avsändarappen.
Ta emot händelser
Skapa ett lagringskonto och en container
Tillstånd som lån på partitioner och kontrollpunkter i händelserna delas mellan mottagare med hjälp av en Azure Storage-container. Du kan skapa ett lagringskonto och en container med Go SDK, men du kan också skapa ett genom att följa anvisningarna i Om Azure-lagringskonton.
Följ dessa rekommendationer när du använder Azure Blob Storage som kontrollpunktslager:
- Använd en separat container för varje konsumentgrupp. Du kan använda samma lagringskonto, men använda en container per grupp.
- Använd inte containern för något annat och använd inte lagringskontot för något annat.
- Lagringskontot ska finnas i samma region som det distribuerade programmet finns i. Om programmet är lokalt kan du försöka välja den region som är närmast.
På sidan Lagringskonto i Azure-portalen i avsnittet Blob Service kontrollerar du att följande inställningar är inaktiverade.
- Hierarkisk namnrymd
- Mjuk borttagning av blob
- Versionshantering
Go-paket
Om du vill ta emot meddelandena hämtar du Go-paketen för Event Hubs enligt följande exempel.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Kod för att ta emot händelser från en händelsehubb
Här är koden för att ta emot händelser från en händelsehubb. De viktigaste stegen i koden är:
- Kontrollera ett kontrollpunktsarkivobjekt som representerar Azure Blob Storage som används av händelsehubben för kontrollpunkter.
- Skapa en Event Hubs-konsumentklient med hjälp av en anslutningssträng till Event Hubs-namnområdet och händelsehubbens namn.
- Skapa en händelseprocessor med hjälp av klientobjektet och kontrollpunktsarkivobjektet. Processorn tar emot och bearbetar händelser.
- För varje partition i händelsehubben skapar du en partitionsklient med processEvents som funktion för att bearbeta händelser.
- Kör alla partitionsklienter för att ta emot och bearbeta händelser.
Viktigt!
Ersätt följande platshållarvärden med faktiska värden:
AZURE STORAGE CONNECTION STRING
med anslutningssträng för ditt Azure Storage-kontoBLOB CONTAINER NAME
med namnet på blobcontainern som du skapade i lagringskontotNAMESPACE CONNECTION STRING
med anslutningssträng för event hubs-namnområdetEVENT HUB NAME
med händelsehubbens namn i exempelkoden.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Köra mottagar- och avsändarappar
Kör mottagarappen först.
Kör avsändarappen.
Vänta en minut för att se följande utdata i mottagarfönstret.
Processing 2 event(s) Event received with body hello Event received with body world
Nästa steg
Se exempel på GitHub på https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.