Ověření pomocí schématu Avro při streamování událostí pomocí sad SDK (AMQP) služby Event Hubs

V tomto rychlém startu se dozvíte, jak odesílat události do centra událostí a přijímat je z centra událostí s ověřováním schématu pomocí knihovny Azure.Messaging.EventHubs .NET.

Poznámka

Azure Schema Registry je funkce služby Event Hubs, která poskytuje centrální úložiště pro schémata pro aplikace řízené událostmi a aplikace zaměřené na zasílání zpráv. Poskytuje flexibilitu pro aplikace pro producenty a příjemce při výměně dat, aniž by bylo nutné spravovat a sdílet schéma. Poskytuje také jednoduchou architekturu zásad správného řízení pro opakovaně použitelná schémata a definuje vztah mezi schématy prostřednictvím konstruktoru seskupení (skupin schémat). Další informace najdete v tématu Azure Schema Registry ve službě Event Hubs.

Požadavky

Pokud s Azure Event Hubs teprve začínáte, přečtěte si téma Přehled služby Event Hubs před provedením tohoto rychlého startu.

K dokončení tohoto rychlého startu potřebujete následující požadavky:

  • Pokud nemáte předplatné Azure, vytvořte si před zahájením bezplatného účtu .
  • Microsoft Visual Studio 2022. Klientská knihovna Azure Event Hubs využívá nové funkce, které byly zavedeny v jazyce C# 8.0. Knihovnu můžete dál používat s předchozími verzemi jazyka C#, ale nová syntaxe není dostupná. Pokud chcete využít úplnou syntaxi, doporučujeme zkompilovat ji se sadou .NET Core SDK 3.0 nebo novější a jazykovou verzí nastavenou na latest. Pokud používáte Visual Studio, verze starší než Visual Studio 2019 nejsou kompatibilní s nástroji potřebnými k vytváření projektů C# 8.0. Visual Studio 2019, včetně bezplatné edice Community, si můžete stáhnout tady.

Vytvoření centra událostí

Postupujte podle pokynů z rychlého startu: Vytvoření oboru názvů služby Event Hubs a centra událostí a vytvořte obor názvů služby Event Hubs a centrum událostí. Pak postupujte podle pokynů v tématu Získání připojovacího řetězce a získejte připojovací řetězec k oboru názvů služby Event Hubs.

Poznamenejte si následující nastavení, která použijete v aktuálním rychlém startu:

  • Připojovací řetězec pro obor názvů služby Event Hubs
  • Název centra událostí

Vytvoření schématu

Postupujte podle pokynů v tématu Vytváření schémat pomocí registru schémat a vytvořte skupinu schémat a schéma.

  1. Pomocí portálu registru schémat vytvořte skupinu schémat s názvem contoso-sg . Jako typ serializace použijte Avro a pro režim kompatibility žádný .

  2. V této skupině schématu vytvořte nové schéma Avro s názvem schématu: Microsoft.Azure.Data.SchemaRegistry.example.Order pomocí následujícího obsahu schématu.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Přidání uživatele do role Čtenář registru schématu

Přidejte svůj uživatelský účet do role Čtenář registru schémat na úrovni oboru názvů. Můžete také použít roli Přispěvatel registru schémat , ale pro tento rychlý start to není nutné.

  1. Na stránce Obor názvů služby Event Hubs v nabídce vlevo vyberte Řízení přístupu (IAM).
  2. Na stránce Řízení přístupu (IAM) vyberte v nabídce + Přidat ->Přidat přiřazení role .
  3. Na stránce Typ přiřazení vyberte Další.
  4. Na stránce Role vyberte Čtenář registru schémat (Preview) a pak v dolní části stránky vyberte Další .
  5. Pomocí odkazu + Vybrat členy přidejte svůj uživatelský účet do role a pak vyberte Další.
  6. Na stránce Zkontrolovat a přiřadit vyberte Zkontrolovat a přiřadit.

Vytváření událostí ve službě Event Hubs pomocí ověřování schématu

Vytvoření konzolové aplikace pro producenta událostí

  1. Spusťte Visual Studio 2019.
  2. Vyberte Vytvořit nový projekt.
  3. V dialogovém okně Vytvořit nový projekt proveďte následující kroky: Pokud toto dialogové okno nevidíte, vyberte v nabídce Soubor , vyberte Nový a pak vyberte Projekt.
    1. Jako programovací jazyk vyberte C# .

    2. Jako typ aplikace vyberte Konzola .

    3. V seznamu výsledků vyberte Konzolová aplikace .

    4. Pak vyberte Další.

      Obrázek znázorňující dialogové okno Nový projekt

  4. Jako název projektu zadejte OrderProducer , jako název řešení SRQuickStart a pak vyberte OK a vytvořte projekt.

Přidání balíčku NuGet služby Event Hubs

  1. V nabídce vyberte Nástroje> Správce >balíčků NuGetKonzola správce balíčků.

  2. Spuštěním následujících příkazů nainstalujte Azure.Messaging.EventHubs a další balíčky NuGet. Stisknutím klávesy ENTER spusťte poslední příkaz.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. Ověřte aplikace producenta pro připojení k Azure prostřednictvím sady Visual Studio, jak je znázorněno tady.

  4. Přihlaste se k Azure pomocí uživatelského účtu, který je členem Schema Registry Reader role na úrovni oboru názvů. Informace o rolích registru schématu najdete v tématu Azure Schema Registry ve službě Event Hubs.

Generování kódu pomocí schématu Avro

  1. Stejný obsah, který jste použili k vytvoření schématu, použijte k vytvoření souboru s názvem Order.avsc. Uložte soubor do složky projektu nebo řešení.
  2. Pak můžete tento soubor schématu použít ke generování kódu pro .NET. Pro generování kódu můžete použít jakýkoli externí nástroj pro generování kódu, například avrogen . Spuštěním příkazu můžete například vygenerovat avrogen -s .\Order.avsc . kód.
  3. Po vygenerování kódu uvidíte ve \Microsoft\Azure\Data\SchemaRegistry\example složce soubor s názvem Order.cs . Pro výše uvedené schéma Avro vygeneruje typy jazyka C# v Microsoft.Azure.Data.SchemaRegistry.example oboru názvů.
  4. Order.cs Přidejte soubor do OrderProducer projektu.

Psaní kódu pro serializaci a odesílání událostí do centra událostí

  1. Do souboru Program.cs přidejte následující kód. Podrobnosti najdete v komentářích ke kódu. Základní kroky v kódu jsou:

    1. Vytvořte klienta producenta, který můžete použít k odesílání událostí do centra událostí.
    2. Vytvořte klienta registru schématu, který můžete použít k serializaci a ověření dat v objektu Order .
    3. Vytvořte nový Order objekt pomocí vygenerovaného Order typu.
    4. Použijte klienta registru schématu k serializaci objektu Order na EventData.
    5. Vytvořte dávku událostí.
    6. Přidejte data události do dávky událostí.
    7. Pomocí klienta producenta odešlete dávku událostí do centra událostí.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. Následující zástupné hodnoty nahraďte skutečnými hodnotami.

    • EVENTHUBSNAMESPACECONNECTIONSTRING – připojovací řetězec pro obor názvů služby Event Hubs
    • EVENTHUBNAME – název centra událostí
    • EVENTHUBSNAMESPACENAME – název oboru názvů služby Event Hubs
    • SCHEMAGROUPNAME – název skupiny schématu
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. Sestavte projekt a ujistěte se, že nedošlo k žádným chybám.

  4. Spusťte program a počkejte na potvrzovací zprávu.

    A batch of 1 order has been published.
    
  5. V Azure Portal můžete ověřit, že centrum událostí přijalo události. V části Metriky přepněte do zobrazení Zprávy. Aktualizujte stránku a aktualizujte graf. Může trvat několik sekund, než se zobrazí, že zprávy byly přijaty.

    Obrázek stránky Azure Portal pro ověření, že centrum událostí přijalo události.

Využívání událostí ze služby Event Hubs s využitím ověřování schématu

Tato část ukazuje, jak napsat konzolovou aplikaci .NET Core, která přijímá události z centra událostí, a pomocí registru schématu deserializovat data událostí.

Další požadavky

  • Vytvořte účet úložiště pro použití procesoru událostí.

Vytvoření aplikace příjemce

  1. V okně Průzkumník řešení klikněte pravým tlačítkem na řešení SRQuickStart, přejděte na Přidat a vyberte Nový projekt.
  2. Vyberte Konzolová aplikace a vyberte Další.
  3. Jako název projektu zadejte OrderConsumer a vyberte Vytvořit.
  4. V okně Průzkumník řešení klikněte pravým tlačítkem na OrderConsumer a vyberte Nastavit jako spouštěný projekt.

Přidání balíčku NuGet služby Event Hubs

  1. V nabídce vyberte Nástroje> Správce >balíčků NuGetKonzola správce balíčků.

  2. V okně Konzola Správce balíčků ověřte, že je pro výchozí projekt vybraná možnost OrderConsumer. Pokud ne, pomocí rozevíracího seznamu vyberte OrderConsumer.

  3. Spuštěním následujícího příkazu nainstalujte požadované balíčky NuGet. Stisknutím klávesy ENTER spusťte poslední příkaz.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. Ověřte aplikace producenta pro připojení k Azure prostřednictvím sady Visual Studio, jak je znázorněno tady.

  5. Přihlaste se k Azure pomocí uživatelského účtu, který je členem Schema Registry Reader role na úrovni oboru názvů. Informace o rolích registru schématu najdete v tématu Azure Schema Registry ve službě Event Hubs.

  6. Přidejte soubor, Order.cs který jste vygenerovali při vytváření aplikace producenta, do projektu OrderConsumer .

  7. Klikněte pravým tlačítkem na OrderConsumer project (Projekt OrderConsumer ) a vyberte Set as Startup project (Nastavit jako projekt po spuštění).

Psaní kódu pro příjem událostí a jejich deserializace pomocí registru schématu

  1. Do souboru Program.cs přidejte následující kód. Podrobnosti najdete v komentářích ke kódu. Základní kroky v kódu jsou:

    1. Vytvořte klienta příjemce, který můžete použít k odesílání událostí do centra událostí.
    2. Vytvořte klienta kontejneru objektů blob pro kontejner objektů blob ve službě Azure Blob Storage.
    3. Vytvořte klienta procesoru událostí a zaregistrujte obslužné rutiny událostí a chyb.
    4. V obslužné rutině události vytvořte klienta registru schématu, který můžete použít k deserializaci dat událostí do objektu Order .
    5. Deserializovat data událostí do objektu Order pomocí serializátoru.
    6. Vytiskněte informace o přijaté objednávce.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be userd as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. Následující zástupné hodnoty nahraďte skutečnými hodnotami.

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING – připojovací řetězec pro obor názvů služby Event Hubs
    • EVENTHUBNAME – název centra událostí
    • EVENTHUBSNAMESPACENAME – název oboru názvů služby Event Hubs
    • SCHEMAGROUPNAME – název skupiny schématu
    • AZURESTORAGECONNECTIONSTRING – připojovací řetězec pro účet úložiště Azure
    • BLOBCONTAINERNAME – Název kontejneru objektů blob
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. Sestavte projekt a ujistěte se, že nedošlo k žádným chybám.

  4. Spusťte aplikaci příjemce.

  5. Měla by se zobrazit zpráva o přijetí událostí.

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    Tyto události jsou tři události, které jste dříve odeslali do centra událostí spuštěním programu odesílatele.

Ukázky

Přečtěte si článek Readme v našem úložišti GitHub.

Vyčištění prostředků

Odstraňte obor názvů služby Event Hubs nebo odstraňte skupinu prostředků, která obsahuje obor názvů.

Další kroky