Avro (Java) を使用して Apache Kafka アプリケーションのスキーマを検証する
このクイックスタート ガイドでは、Event Hubs 用の Azure スキーマ レジストリを使用して Apache Kafka アプリケーションからのイベントを検証する方法について説明します。
このユース ケースの場合、Kafka プロデューサー アプリケーションでは、Azure スキーマ レジストリに保存されている Avro スキーマを使用して、イベントをシリアル化し、Azure Event Hubs の Kafka トピックまたはイベント ハブに発行します。 Kafka コンシューマーでは、それが使用する、Event Hubs からのイベントが逆シリアル化されます。 そのために、イベントと Avro スキーマのスキーマ ID が使用されます。この ID は Azure スキーマ レジストリに保存されています。
前提条件
Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に Event Hubs の概要を参照してください。
このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。
- Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。
- 開発環境で、次のコンポーネントをインストールします。
- Java Development Kit (JDK) 1.7 以降
- Maven バイナリ アーカイブのダウンロードとインストール
- Git
- Kafka 用 Azure スキーマ レジストリ リポジトリをクローンします。
イベント ハブの作成
「Event Hubs 名前空間とイベント ハブを作成する」のクイックスタートの手順に従って、Event Hubs 名前空間とイベント ハブを作成します。 次に、接続文字列を取得する方法に関するページの手順に従って、使用している Event Hubs 名前空間への接続文字列を取得します。
現在のクイックスタートで使用する次の設定をメモします。
- Event Hubs 名前空間の接続文字列
- イベント ハブの名前
スキーマの作成
スキーマ レジストリを使用してスキーマを作成する方法に関するページの手順に従って、スキーマ グループとスキーマを作成します。
スキーマ レジストリ ポータルを使用して、contoso-sg という名前のスキーマ グループを作成します。 シリアル化の種類として Avro を使用し、互換性モードに [なし] を使用します。
そのスキーマ グループで、次のスキーマ コンテンツを使用して、スキーマ名
Microsoft.Azure.Data.SchemaRegistry.example.Order
を使用して新しい Avro スキーマを作成します。{ "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
スキーマ レジストリにアクセスするアプリケーションを登録する
Microsoft Entra ID を使用して、クライアント アプリケーションを Azure portal から Microsoft Entra テナントに登録することで、Kafka プロデューサーおよびコンシューマー アプリケーションによる Azure スキーマ レジストリ リソースへのアクセスを承認できます。
example-app
という名前の Microsoft Entra アプリケーションの登録については、「アプリケーションを Microsoft Entra テナントに登録する」をご覧ください。
- tenant.id - アプリケーションのテナント ID を設定します
- client.id - アプリケーションのクライアント ID を設定します
- client.secret - 認証用のクライアント シークレットを設定します
さらに、マネージド ID を使用している場合は、以下が必要になります。
- use.managed.identity.credential - MSI 資格情報を使用する必要があることを示します。MSI 対応 VM に使用してください
- managed.identity.clientId - 指定した場合は、指定されたクライアント ID を使用して MSI 資格情報が構築されます
- managed.identity.clientId - 指定した場合は、指定されたリソース ID を使用して MSI 資格情報が構築されます
スキーマ レジストリ閲覧者ロールにユーザーを追加する
名前空間レベルでスキーマ レジストリ閲覧者ロールにユーザー アカウントを追加します。 スキーマ レジストリ共同作成者ロールを使用することもできますが、このクイックスタートでは必要ありません。
- [Event Hubs 名前空間] ページで、左側のメニューの [アクセス制御 (IAM)] を選択します。
- [アクセス制御 (IAM)] ページのメニューで、[+ 追加] ->[ロールの割り当てを追加] を選択します。
- [割り当ての種類] ページで、[次へ] を選択します。
- [ロール] ページで、[スキーマ レジストリ閲覧者 (プレビュー)] を選択し、ページの下部にある [次へ] を選択します。
- [+ Select members] (+ メンバーを選択する) リンクを使用して、前の手順で作成した
example-app
アプリケーションをロールに追加し、[次へ] を選択します。 - [確認と割り当て] ページで、[確認と割り当て] を選択します。
Kafka アプリケーションのクライアント アプリケーション構成を更新する
Kafka プロデューサー アプリケーションとコンシューマー アプリケーションのクライアント構成を、作成した Microsoft Entra アプリケーションに関連した構成とスキーマ レジストリの情報を使用して更新する必要があります。
Kafka プロデューサーの構成を更新するには、azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer に移動します。
Event Hubs の Kafka クイック スタート ガイドに従って、src/main/resources/app.properties で Kafka アプリケーションの構成を更新します。
スキーマ レジストリ関連の構成と上記で作成した Microsoft Entra アプリケーションを使用して、src/main/resources/app.properties のプロデューサーの構成の詳細を、次のように更新します。
schema.group=contoso-sg schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net tenant.id=<> client.id=<> client.secret=<>
同じ手順に従って、azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer の構成も更新します。
Kafka プロデューサー アプリケーションとコンシューマー アプリケーションのどちらにも、次の Avro スキーマが使用されます。
{ "namespace": "com.azure.schemaregistry.samples", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
Avro スキーマ検証で Kafka プロデューサーを使用する
Kafka プロデューサー アプリケーションを実行するには、azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-producer に移動します。
プロデューサー アプリケーションを実行すると、Avro 固有のレコードまたは汎用レコードをそのアプリケーションで生成できます。 特定のレコードのモードでは、まず、次の maven コマンドを使用して、プロデューサー スキーマに対してクラスを生成する必要があります。
mvn generate-sources
その後に、次のコマンドを使用してプロデューサー アプリケーションを実行できます。
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
プロデューサー アプリケーションが正常に実行されると、プロデューサー シナリオを選択するように求められます。 このクイックスタートでは、オプション 1 - produce SpecificRecords を選択します。
Enter case number: 1 - produce Avro SpecificRecords 2 - produce Avro GenericRecords
データのシリアル化と発行が正常に行われると、プロデューサー アプリケーションに次のコンソール ログが表示されます。
INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"} INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"} INFO com.azure.schemaregistry.samples.producer.KafkaAvroSpecificRecord - Sent Order {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
Avro スキーマ検証で Kafka コンシューマーを使用する
Kafka コンシューマー アプリケーションを実行するには、azure-schema-registry-for-kafka/tree/master/java/avro/samples/kafka-consumer に移動します。
コンシューマー アプリケーションを実行すると、Avro 固有のレコードまたは汎用レコードをそのアプリケーションで使用できます。 特定のレコードのモードでは、まず、次の maven コマンドを使用して、プロデューサー スキーマに対してクラスを生成する必要があります。
mvn generate-sources
その後に、次のコマンドを使用してコンシューマー アプリケーションを実行できます。
mvn clean package mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
コンシューマー アプリケーションが正常に実行されると、プロデューサー シナリオを選択するように求められます。 このクイックスタートでは、オプション 1 - consume Avro SpecificRecords を選択します。
Enter case number: 1 - consume Avro SpecificRecords 2 - consume Avro GenericRecords
データの使用と逆シリアル化が正常に行われると、プロデューサー アプリケーションに次のコンソール ログが表示されます。
INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-0", "amount": 10.0, "description": "Sample order 0"} INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-1", "amount": 11.0, "description": "Sample order 1"} INFO com.azure.schemaregistry.samples.consumer.KafkaAvroSpecificRecord - Order received: {"id": "ID-2", "amount": 12.0, "description": "Sample order 2"}
リソースをクリーンアップする
Event Hubs 名前空間を削除するか、名前空間を含むリソース グループを削除します。