Проверка с помощью схемы Avro при потоковой передаче событий с помощью пакетов SDK для .NET (AMQP) Центров событий
Из этого краткого руководства вы узнаете, как отправлять события в концентратор событий и получать их из концентратора событий с помощью проверки схемы с помощью библиотеки Azure.Messaging.EventHubs .NET.
Примечание
Реестр схем Azure — это функция Центров событий, которая предоставляет центральный репозиторий для схем для управляемых событиями и ориентированных на обмен сообщениями приложений. Он обеспечивает гибкость работы приложений-производителей и приложений-получателей при обмене данными, избавляя от необходимости управлять схемой и позволяя совместно использовать ее в приложениях обоих типов. Реестр схем также предоставляет простую платформу управления для повторно используемых схем и определяет связь между схемами через конструкцию группировки (группы схем). Дополнительные сведения см. в статье Реестр схем Azure в Центрах событий.
Предварительные требования
Если вы впервые используете Центры событий Azure, ознакомьтесь с общими сведениями, прежде чем приступить к работе с этим руководством.
Для работы с данным руководством необходимо следующее:
- Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начать работу.
-
Microsoft Visual Studio 2022.
Клиентская библиотека для Центров событий Azure использует новые возможности, реализованные в C# версии 8.0. Вы по-прежнему можете использовать библиотеку с предыдущими версиями языка C#, но новый синтаксис недоступен. Чтобы использовать полный синтаксис, рекомендуется выполнять компиляцию с помощью пакета SDK для .NET Core 3.0 или более поздней версии, а для языковой версии должно быть установлено значение
latest
. Если вы используете Visual Studio, учитывайте, что версии до Visual Studio 2019 несовместимы со средствами, необходимыми для сборки проектов C# 8.0. Скачать Visual Studio 2019, в том числе бесплатный выпуск Community, можно здесь.
Создание концентратора событий
Следуйте инструкциям из краткого руководства по созданию пространства имен Центров событий и концентратора событий , чтобы создать пространство имен Центров событий и концентратор событий. Затем следуйте инструкциям из раздела Получение строки подключения , чтобы получить строку подключения к пространству имен Центров событий.
Запишите на указанные ниже параметры, которые будут использоваться в этом кратком руководстве.
- Строка подключения пространства имен Центров событий
- Имя концентратора событий
Создание схемы
С помощью инструкций из статьи о создании схем с помощью реестра схем создайте группу схем и схему.
Создайте группу схем с именем contoso-sg с помощью портала реестра схем. В качестве типа сериализации укажите Avro, в качестве режима совместимости — None (т. е. без совместимости).
В этой группе схем создайте новую схему Avro с именем
Microsoft.Azure.Data.SchemaRegistry.example.Order
и указанным ниже содержимым.{ "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
Добавление пользователя в роль читателя реестра схем
Добавьте учетную запись пользователя в роль читателя реестра схем на уровне пространства имен. Вы также можете использовать роль Участник реестра схем , но это необязательно для работы с этим кратким руководством.
- На странице Пространство имен Центров событий выберите Управление доступом (IAM) в меню слева.
- На странице Управление доступом (IAM) выберите + Добавить ->Добавить назначение роли в меню.
- На странице Тип назначения нажмите кнопку Далее.
- На странице Роли выберите Читатель реестра схем (предварительная версия) и нажмите кнопку Далее в нижней части страницы.
- Используйте ссылку + Выбрать участников , чтобы добавить учетную запись пользователя в роль, а затем нажмите кнопку Далее.
- На странице Проверка и назначение выберите Проверка и назначение.
Создание событий для концентраторов событий с проверкой схемы
Создание консольного приложения для производителя событий
- Запустите Visual Studio 2019.
- Выберите Создать новый проект.
- В диалоговом окне Создать проект выполните следующие действия: Если это диалоговое окно не отображается, щелкните в меню пункт Файл, затем последовательно выберите Новый и Проект.
Выберите язык программирования C#.
Для типа приложения выберите значение Консоль.
В списке результатов выберите Консольное приложение.
Затем нажмите кнопку Далее.
- Укажите имя проекта OrderProducer и имя решения SRQuickStart, а затем нажмите ОК, чтобы создать проект.
Добавление пакета NuGet для Центров событий
Выберите в меню элементы Инструменты>Диспетчер пакетов NuGet>Консоль диспетчера пакетов.
Выполните следующие команды, чтобы установить Azure.Messaging.EventHubs и другие пакеты NuGet. Нажмите клавишу ВВОД , чтобы выполнить последнюю команду.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
Настройке проверку подлинности приложений-производителей для подключения к Azure через Visual Studio, как описано здесь.
Войдите в Azure с помощью учетной записи пользователя, которая является членом
Schema Registry Reader
роли на уровне пространства имен. Сведения о ролях реестра схем см . в разделе Реестр схем Azure в Центрах событий.
Создание кода с помощью схемы Avro
- Используйте то же содержимое, которое использовалось при создании схемы, чтобы создать файл с именем
Order.avsc
. Сохраните файл в папке проекта или решения. - Затем на основе этого файла схемы можно сгенерировать код для .NET. Для создания кода можно использовать любое внешнее средство, например avrogen. Например, можно выполнить команду
avrogen -s .\Order.avsc .
для создания кода. - После создания кода вы увидите файл с именем
Order.cs
в папке\Microsoft\Azure\Data\SchemaRegistry\example
. Для приведенной выше схемы Avro создаются типы C# в пространстве именMicrosoft.Azure.Data.SchemaRegistry.example
. -
Order.cs
Добавьте файл вOrderProducer
проект.
Написание кода для отправки событий в концентратор событий
Добавьте в файл
Program.cs
указанный ниже код. Дополнительные сведения см. в комментариях к коду. Ниже приведены общие шаги в коде.- Создайте клиент производителя, который можно использовать для отправки событий в концентратор событий.
- Создайте клиент реестра схем, который можно использовать для сериализации и проверки данных в объекте
Order
. - Создайте объект
Order
с помощью созданногоOrder
типа. - Используйте клиент реестра схем для сериализации объекта в
Order
EventData
. - Создайте пакет событий.
- Добавьте данные события в пакет событий.
- Используйте клиент производителя для отправки пакета событий в концентратор событий.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // The Event Hubs client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when events are being published or read regularly. EventHubProducerClient producerClient; // Create a producer client that you can use to send events to an event hub producerClient = new EventHubProducerClient(connectionString, eventHubName); // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Create a new order object using the generated type/class 'Order'. var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." }; EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData)); // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add the event data to the event batch. eventBatch.TryAdd(eventData); // Send the batch of events to the event hub. await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 1 order has been published.");
Замените следующие значения заполнителей реальными значениями.
-
EVENTHUBSNAMESPACECONNECTIONSTRING
— строка подключения для пространства имен Центров событий; -
EVENTHUBNAME
— имя концентратора событий; -
EVENTHUBSNAMESPACENAME
— имя пространства имен Центров событий; -
SCHEMAGROUPNAME
— имя группы схем;
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME";
-
Выполните сборку проекта и убедитесь, что она прошла без ошибок.
Выполните программу и дождитесь подтверждающего сообщения.
A batch of 1 order has been published.
Получение событий концентратором событий можно проверить на портале Azure. Переключитесь на представление Сообщения в разделе Метрики. Обновите страницу, чтобы обновить диаграмму. На отображение полученных сообщений может уйти несколько секунд.
Использование событий из концентраторов событий с проверкой схемы
В этом разделе показано, как написать консольное приложение .NET Core, которое получает события из концентратора событий, и использовать реестр схем для десериализации данных событий.
Дополнительные требования
- Создайте учетную запись хранения для использования обработчика событий.
Создание приложения-получателя
- В окне обозревателя решений щелкните правой кнопкой мыши решение SRQuickStart, выберите элемент Добавить и действие Создать проект.
- Выберите Консольное приложение и нажмите Далее.
- Введите OrderConsumer в поле Название проекта и выберите Создать.
- В окне Обозреватель решений щелкните правой кнопкой мыши OrderConsumer и выберите Установить как запускаемый проект.
Добавление пакета NuGet для Центров событий
Выберите в меню элементы Инструменты>Диспетчер пакетов NuGet>Консоль диспетчера пакетов.
В окне Консоли диспетчера пакетов убедитесь, что в качестве проекта по умолчанию выбран OrderConsumer. В противном случае выберите OrderConsumer из раскрывающегося списка.
Выполните следующую команду, чтобы установить необходимые пакеты NuGet. Нажмите клавишу ВВОД , чтобы выполнить последнюю команду.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Messaging.EventHubs.Processor Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
Настройке проверку подлинности приложений-производителей для подключения к Azure через Visual Studio, как описано здесь.
Войдите в Azure с помощью учетной записи пользователя, которая является членом
Schema Registry Reader
роли на уровне пространства имен. Сведения о ролях реестра схем см . в разделе Реестр схем Azure в Центрах событий.Добавьте файл,
Order.cs
созданный при создании приложения-производителя, в проект OrderConsumer .Щелкните правой кнопкой мыши проект OrderConsumer и выберите Назначить запускаемым проектом.
Написание кода для получения событий и их десериализации с помощью реестра схем
Добавьте в файл
Program.cs
указанный ниже код. Дополнительные сведения см. в комментариях к коду. Ниже приведены общие шаги в коде.- Создайте клиент потребителя, который можно использовать для отправки событий в концентратор событий.
- Создайте клиент контейнера BLOB-объектов для контейнера BLOB-объектов в хранилище BLOB-объектов Azure.
- Создайте клиент обработчика событий и зарегистрируйте обработчики событий и ошибок.
- В обработчике событий создайте клиент реестра схем, который можно использовать для десериализации данных событий в
Order
объект . - Десериализация данных события в
Order
объект с помощью сериализатора. - Вывод сведений о полученном заказе.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // connection string for the Azure Storage account const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // name of the blob container that will be userd as a checkpoint store const string blobContainerName = "BLOBCONTAINERNAME"; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 30 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(30)); // Stop the processing await processor.StopProcessingAsync(); static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Deserialized data in the received event using the schema Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order)); // Print the received event Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}"); await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
Замените следующие значения заполнителей реальными значениями.
-
EVENTHUBSNAMESPACE-CONNECTIONSTRING
— строка подключения для пространства имен Центров событий; -
EVENTHUBNAME
— имя концентратора событий; -
EVENTHUBSNAMESPACENAME
— имя пространства имен Центров событий; -
SCHEMAGROUPNAME
— имя группы схем; -
AZURESTORAGECONNECTIONSTRING
— строка подключения для учетной записи хранения Azure. -
BLOBCONTAINERNAME
— Имя контейнера BLOB-объектов
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // Azure storage connection string const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // Azure blob container name const string blobContainerName = "BLOBCONTAINERNAME";
-
Выполните сборку проекта и убедитесь, что она прошла без ошибок.
Запустите приложение получателя.
Отобразится сообщение о том, что события успешно получены.
Received order with ID: 1234, amount: 45.29, description: First sample order.
Здесь отображаются те же три события, которые вы ранее отправили в концентратор событий, выполняя программу отправителя.
Примеры
См. статью Readme в нашем репозитории GitHub.
Очистка ресурсов
Удалите пространство имен Центров событий или группу ресурсов, содержащую это пространство имен.