Проверка схем для приложений Apache Kafka с помощью Avro (Java)

В этом кратком руководстве мы рассмотрим, как проверить событие из приложений Apache Kafka с помощью реестра схем Azure для Центров событий.

В этом случае приложение производителя Kafka использует схему Avro, хранящуюся в реестре схем Azure, для сериализации события и публикации их в центре событий Kafka в Центры событий Azure. Потребитель Kafka десериализирует события, которые он потребляет из Центров событий. Для этого он использует идентификатор схемы события и схемы Avro, которая хранится в реестре схем Azure.

Diagram showing schema serialization/de-serialization for Kafka applications using Avro schema.

Необходимые компоненты

Если вы впервые используете Центры событий Azure, ознакомьтесь с общими сведениями, прежде чем приступить к работе с этим руководством.

Для работы с данным руководством необходимо следующее:

Создание концентратора событий

Следуйте инструкциям из краткого руководства. Создание пространства имен Центров событий и концентратора событий для создания пространства имен Центров событий и концентратора событий. Затем следуйте инструкциям из получения строка подключения, чтобы получить строка подключения в пространство имен Центров событий.

Запишите следующие параметры, которые вы используете в текущем кратком руководстве.

  • Строка подключения пространства имен Центров событий
  • Имя концентратора событий

Создание схемы

Следуйте инструкциям из руководства по созданию схем с помощью реестра схем, чтобы создать группу схем и схему.

  1. Создайте группу схем с именем contoso-sg с помощью портала реестра схем. В качестве типа сериализации укажите Avro, в качестве режима совместимости — None (т. е. без совместимости).

  2. В этой группе схем создайте новую схему Avro с именем Microsoft.Azure.Data.SchemaRegistry.example.Order и указанным ниже содержимым.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Регистрация приложения для доступа к реестру схем

Идентификатор Microsoft Entra можно использовать для авторизации производителя и клиентского приложения Kafka для доступа к ресурсам реестра схем Azure, зарегистрируя клиентское приложение в клиенте Microsoft Entra из портал Azure.

Чтобы зарегистрировать приложение Microsoft Entra с именем example-app , см. статью "Регистрация приложения с помощью клиента Microsoft Entra".

  • tenant.id — задает идентификатор клиента приложения.
  • client.id — задает идентификатор клиента приложения.
  • client.secret — задает секрет клиента для проверки подлинности.

Если вы используете управляемое удостоверение, вам потребуется:

  • use.managed.identity.credential — указывает, что учетные данные MSI следует использовать для виртуальной машины с поддержкой MSI.
  • managed.identity.clientId — если задано, он создает учетные данные MSI с заданным идентификатором клиента.
  • managed.identity.resourceId — если задано, он создает учетные данные MSI с указанным идентификатором ресурса.

Добавление пользователя в роль читателя реестра схем

Добавьте учетную запись пользователя в роль читателя реестра схем на уровне пространства имен. Вы также можете использовать роль участника реестра схем, но это не обязательно для этого краткого руководства.

  1. На странице пространства имен Центров событий выберите элемент управления доступом (IAM) в меню слева.
  2. На странице управления доступом (IAM) выберите +Добавить -Добавить> назначение ролей в меню.
  3. На странице "Тип назначения" нажмите кнопку "Далее".
  4. На странице "Роли" выберите средство чтения реестра схем (предварительная версия) и нажмите кнопку "Далее" в нижней части страницы.
  5. Используйте ссылку +Выбрать участников , чтобы добавить example-app приложение, созданное на предыдущем шаге, в роль, а затем нажмите кнопку "Далее".
  6. На странице "Рецензирование и назначение" выберите "Рецензирование и назначение".

Обновление конфигурации клиентского приложения kafka

Необходимо обновить клиентскую конфигурацию приложений производителя и потребителей Kafka с конфигурацией, связанной с созданным приложением Microsoft Entra, и сведениями реестра схем.

Чтобы обновить конфигурацию производителя Kafka, перейдите к azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer.

  1. Обновите конфигурацию приложения Kafka в src/main/resources/app.properties , следуя руководству по краткому руководству по Kafka для Центров событий.

  2. Обновите сведения о конфигурации производителя в src/main/resources/app.properties с помощью связанной конфигурации реестра схем и приложения Microsoft Entra, созданного выше, следующим образом:

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. Следуйте тем же инструкциям и обновите конфигурацию azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer .

  4. Для приложений производителя и потребителей Kafka используется следующая схема Avro:

    {
      "namespace": "com.azure.schemaregistry.samples",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    }
    

Использование производителя Kafka с проверкой схемы Avro

Чтобы запустить приложение производителя Kafka, перейдите к azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer.

  1. Вы можете запустить приложение-производитель, чтобы он смог создавать определенные записи Avro или универсальные записи. Для конкретного режима записей необходимо сначала создать классы для любой схемы производителя с помощью следующей команды maven:

    mvn generate-sources
    
  2. Затем можно запустить приложение производителя с помощью следующих команд.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. После успешного выполнения приложения производителя он предложит выбрать сценарий производителя. В этом кратком руководстве можно выбрать вариант 1— создать Avro SpecificRecords.

    Enter case number:
    1 - produce Avro SpecificRecords
    2 - produce Avro GenericRecords
    
  4. После успешной сериализации и публикации данных в приложении производителя должны появиться следующие журналы консоли:

    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"}
    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"}
    INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
    

Использование потребителя Kafka с проверкой схемы Avro

Чтобы запустить приложение потребителя Kafka, перейдите к azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer.

  1. Вы можете запустить приложение-получатель, чтобы оно потребляло определенные записи или универсальные записи Avro. Для конкретного режима записей необходимо сначала создать классы для любой схемы производителя с помощью следующей команды maven:

    mvn generate-sources
    
  2. Затем можно запустить приложение-получатель с помощью следующей команды.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. После успешного выполнения приложения-получателя он предложит выбрать сценарий производителя. В этом кратком руководстве можно выбрать вариант 1— использовать Avro SpecificRecords.

    Enter case number:
    1 - consume Avro SpecificRecords
    2 - consume Avro GenericRecords
    
  4. После успешного потребления и десериализации данных в приложении производителя должны появиться следующие журналы консоли:

    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"}
    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"}
    INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
    

Очистка ресурсов

Удалите пространство имен Центров событий или удалите группу ресурсов, содержащую пространство имен.