Apache Kafka アプリケーションで JSON スキーマを使用する

このチュートリアルでは、Event Hubs の Azure スキーマ レジストリを使用して、JSON スキーマでイベントをシリアル化および逆シリアル化するシナリオについて説明します。

このユース ケースの場合、Kafka プロデューサー アプリケーションでは、Azure スキーマ レジストリに保存されている JSON スキーマを使用して、イベントをシリアル化し、Azure Event Hubs の Kafka トピック/イベント ハブに発行します。 Kafka コンシューマーでは、それが使用する、Event Hubs からのイベントを逆シリアル化します。 そのために、イベントと JSON スキーマのスキーマ ID が使用されます。この ID は Azure スキーマ レジストリに保存されています。 JSON スキーマを使用した Kafka アプリケーションのスキーマのシリアル化と逆シリアル化を示す図。

前提条件

Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に Event Hubs の概要を参照してください。

このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。

イベント ハブの作成

Event Hubs 名前空間とイベント ハブを作成する」のクイックスタートの手順に従って、Event Hubs 名前空間とイベント ハブを作成します。 次に、接続文字列を取得する方法に関するページの手順に従って、使用している Event Hubs 名前空間への接続文字列を取得します。

現在のクイックスタートで使用する次の設定をメモします。

  • Event Hubs 名前空間の接続文字列
  • イベント ハブの名前

スキーマの作成

スキーマ レジストリを使用してスキーマを作成する方法に関するページの手順に従って、スキーマ グループとスキーマを作成します。

  1. スキーマ レジストリ ポータルを使用して、contoso-sg という名前のスキーマ グループを作成します。 シリアル化の種類として "JSON スキーマ" を使用します。

  2. そのスキーマ グループで、次のスキーマ コンテンツを使用して、"Microsoft.Azure.Data.SchemaRegistry.example.CustomerInvoice" というスキーマ名の新しい JSON スキーマを作成します。

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    } 
    

スキーマ レジストリにアクセスするアプリケーションを登録する

Microsoft Entra ID を使って、Kafka のプロデューサー アプリケーションとコンシューマー アプリケーションが Azure スキーマ レジストリ リソースにアクセスするのを認可できます。 これを有効にするには、Azure portal からクライアント アプリケーションを Microsoft Entra テナントに登録する必要があります。

example-app という名前の Microsoft Entra アプリケーションの登録については、「アプリケーションを Microsoft Entra テナントに登録する」をご覧ください。

  • tenant.id - アプリケーションのテナント ID を設定します
  • client.id - アプリケーションのクライアント ID を設定します
  • client.secret - 認証用のクライアント シークレットを設定します

さらに、マネージド ID を使用している場合は、以下が必要になります。

  • use.managed.identity.credential - MSI 資格情報を使用する必要があることを示します。MSI 対応 VM に使用してください
  • managed.identity.clientId - 指定した場合、指定されたクライアント ID を使用して MSI 資格情報を作成します。managed.identity.resourceId - 指定した場合、指定されたリソース ID を使用して MSI 資格情報を作成します

スキーマ レジストリ閲覧者ロールにユーザーを追加する

名前空間レベルでスキーマ レジストリ閲覧者ロールにユーザー アカウントを追加します。 スキーマ レジストリ共同作成者ロールを使用することもできますが、このクイックスタートでは必要ありません。

  1. [Event Hubs 名前空間] ページで、左側のメニューの [アクセス制御 (IAM)] を選択します。
  2. [アクセス制御 (IAM)] ページのメニューで、[+ 追加] ->[ロールの割り当てを追加] を選択します。
  3. [割り当ての種類] ページで、[次へ] を選択します。
  4. [ロール] ページで、[スキーマ レジストリ閲覧者] を選択し、ページの下部にある [次へ] を選択します。
  5. [+ Select members] (+ メンバーを選択する) リンクを使用して、前の手順で作成した example-app アプリケーションをロールに追加し、[次へ] を選択します。
  6. [確認と割り当て] ページで、[確認と割り当て] を選択します。

Kafka アプリケーションのクライアント アプリケーション構成を更新する

Kafka のプロデューサー アプリケーションとコンシューマー アプリケーションのクライアント構成を、Microsoft Entra アプリケーションの詳細とスキーマ レジストリの情報を使って更新する必要があります。

Kafka プロデューサーの構成を更新するには、azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer に移動します。

  1. Event Hubs の Kafka クイック スタート ガイドに従って、src/main/resources/app.properties で Kafka アプリケーションの構成を更新します。

  2. スキーマ レジストリ関連の構成と、前のステップで作成した Microsoft Entra アプリケーションを使って、src/main/resources/app.properties にあるプロデューサーの構成の詳細を、次のように更新します。

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. 同じ手順に従って、azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer の構成も更新します。

  4. Kafka プロデューサー アプリケーションとコンシューマー アプリケーションのどちらにも、次の JSON スキーマが使用されます。

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    }
    

JSON スキーマ検証で Kafka プロデューサーを使用する

Kafka プロデューサー アプリケーションを実行するには、azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer に移動します。

  1. プロデューサー アプリケーションを実行すると、JSON スキーマ固有のレコードまたは汎用レコードをそのアプリケーションで生成できます。 特定のレコードのモードでは、まず、次の maven コマンドを使用して、プロデューサー スキーマに対してクラスを生成する必要があります。

    mvn generate-sources
    
  2. その後に、次のコマンドを使用してプロデューサー アプリケーションを実行できます。

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. プロデューサー アプリケーションが正常に実行されると、プロデューサー シナリオを選択するように求められます。 このクイックスタートでは、オプション 1 - produce SpecificRecords を選択できます。

    Enter case number:
    1 - produce SpecificRecords
    
  4. データのシリアル化と発行が正常に行われると、プロデューサー アプリケーションに次のコンソール ログが表示されます。

    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 0
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 1
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 2
    

JSON スキーマ検証で Kafka コンシューマーを使用する

Kafka コンシューマー アプリケーションを実行するには、azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer に移動します。

  1. コンシューマー アプリケーションを実行すると、JSON スキーマ固有のレコードまたは汎用レコードをそのアプリケーションで使用できます。 特定のレコードのモードでは、まず、次の maven コマンドを使用して、プロデューサー スキーマに対してクラスを生成する必要があります。

    mvn generate-sources
    
  2. その後に、次のコマンドを使用してコンシューマー アプリケーションを実行できます。

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. コンシューマー アプリケーションが正常に実行されると、プロデューサー シナリオを選択するように求められます。 このクイックスタートでは、オプション "1 - consume SpecificRecords" を選びます。

    Enter case number:
    1 - consume SpecificRecords
    
  4. データの使用と逆シリアル化が正常に行われると、プロデューサー アプリケーションに次のコンソール ログが表示されます。

    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 0, merchantId=Merchant Id 0, transactionValueUsd=0, userId=User Id 0}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 1, merchantId=Merchant Id 1, transactionValueUsd=1, userId=User Id 1}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 2, merchantId=Merchant Id 2, transactionValueUsd=2, userId=User Id 2}
    
    

リソースをクリーンアップする

Event Hubs 名前空間を削除するか、名前空間を含むリソース グループを削除します。