Java を使用して Azure Event Hubs との間でイベントを送受信する
このクイックスタートでは、azure-messaging-eventhubs Java パッケージを使用して、イベント ハブとの間でイベントを送受信する方法について説明します。
ヒント
Spring アプリケーションで Azure Event Hubs リソースを操作している場合は、Spring Cloud Azure を代替手段として検討することをお勧めします。 Spring Cloud Azure は、Spring と Azure サービスのシームレスな統合を実現するオープンソース プロジェクトです。 Spring Cloud Azure の詳細と Event Hubs の使用例については、「Azure Event Hubs を使用する Spring Cloud Stream with」を参照してください。
前提条件
Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に Event Hubs の概要を参照してください。
このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。
- Microsoft Azure サブスクリプション。 Azure Event Hubs を含む Azure サービスを使用するには、サブスクリプションが必要です。 既存の Microsoft Azure アカウントをお持ちでない場合は、アカウントを作成する際に、無料試用版にサインアップするか、MSDN サブスクライバー特典を利用できます。
- Java 開発環境。 このクイックスタートでは Eclipse を使用します。 Java Development Kit (JDK) バージョン 8 以上が必要です。
- Event Hubs 名前空間とイベント ハブを作成する。 最初の手順では、Azure Portal を使用して Event Hubs 型の名前空間を作成し、アプリケーションがイベント ハブと通信するために必要な管理資格情報を取得します。 名前空間とイベント ハブを作成するには、こちらの記事の手順に従います。 その後、次の記事の手順に従って、Event Hubs 名前空間用の接続文字列を取得します: 接続文字列を取得する。 この接続文字列は、このクイックスタートの後の手順で必要になります。
送信イベント
このセクションでは、イベント ハブにイベントを送信する Java アプリケーションの作成方法を説明します。
Azure Event Hubs ライブラリへの参照を追加する
まず、好みの Java 開発環境でコンソールまたはシェル アプリケーション用の新しい Maven プロジェクトを作成します。 pom.xml
ファイルを次のように更新します。 Event Hubs 用の Java クライアント ライブラリは、Maven Central Repository にあります。
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.18.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.11.2</version>
<scope>compile</scope>
</dependency>
Note
バージョンを、Maven リポジトリに発行された最新バージョンに更新します。
Azure に対してアプリを認証する
このクイック スタートでは、Azure Event Hubs に接続する 2 つの方法である、パスワードレスと接続文字列について説明します。 1 番目のオプションとして、Microsoft Entra ID のセキュリティ プリンシパルとロールベースのアクセス制御 (RBAC) を使用して Event Hubs 名前空間に接続する方法について説明します。 コードや構成ファイル、または Azure Key Vault などのセキュリティで保護されたストレージに、ハードコーディングされた接続文字列を含める心配はありません。 2 番目のオプションとして、接続文字列を使用して Event Hubs 名前空間に接続する方法について説明します。 Azure を初めて使用する場合は、接続文字列オプションの方が理解しやすいかもしれません。 実際のアプリケーションと運用環境では、パスワードレス オプションを使用することをお勧めします。 詳細については、「認証と承認」を参照してください。 パスワードレス認証の詳細については、概要ページを参照してください。
Microsoft Entra ユーザーにロールを割り当てる
ローカルでの開発時には、Azure Event Hubs に接続するユーザー アカウントに正しいアクセス許可があることを確認してください。 メッセージを送受信するには、Azure Event Hubs データ所有者ロールが必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write
アクションを含む別のロールが必要です。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 ロールの割り当てに使用できるスコープの詳細は、スコープの概要ページを参照してください。
次の例では、ユーザー アカウントに Azure Event Hubs Data Owner
ロールを割り当てます。これにより、Azure Event Hubs リソースにフル アクセスできます。 実際のシナリオでは、より安全な運用環境を実現するため、最小限の特権の原則に従って、必要な最小限のアクセス許可のみをユーザーに付与します。
Azure Event Hubs の Azure の組み込みロール
Azure Event Hubs の場合、Azure portal および Azure リソース管理 API による名前空間とそれに関連するすべてのリソースの管理は、Azure RBAC モデルを使用して既に保護されています。 Azure では、Event Hubs 名前空間へのアクセス権を付与するために、次の Azure 組み込みロールが提供されています。
- Azure Event Hubs データ所有者: Event Hubs 名前空間とそのエンティティ (キュー、トピック、サブスクリプション、フィルター) へのデータ アクセスが可能です。
- Azure Event Hubs データ送信者: このロールを使用して、Event Hubs 名前空間とそのエンティティへのアクセス権を送信者に付与します。
- Azure Event Hubs データ受信者: このロールを使用して、Event Hubs 名前空間とそのエンティティへのアクセス権を受信者に付与します。
カスタム ロールを作成する場合は、Event Hubs 操作に必要な権限に関するページを参照してください。
重要
ほとんどの場合、ロールの割り当てが Azure に反映されるまでの時間は 1、2 分です。 まれに、最大 8 分かかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。
Azure portal で、メインの検索バーまたは左側のナビゲーションを使用して Event Hubs 名前空間を見つけます。
概要ページで、左側のメニューから [アクセス制御 (IAM)] を選択します。
[アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。
上部のメニューから [+ 追加] を選択し、次に結果のドロップダウン メニューから [ロールの割り当ての追加] を選択します。
検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、
Azure Event Hubs Data Owner
を検索して一致する結果を選択します。 [次へ] を選びます。[アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。
ダイアログで、自分の Microsoft Entra ユーザー名 (通常は user@domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。
[レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。
イベント ハブにメッセージを送信するコードの記述
Sender
という名前のクラスを追加し、このクラスに次のコードを追加します。
重要
<NAMESPACE NAME>
は、実際の Event Hubs 名前空間の名前に更新してください。<EVENT HUB NAME>
をイベント ハブの名前に更新します。
package ehubquickstart;
import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;
import com.azure.identity.*;
public class SenderAAD {
// replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
// Example: private static final String namespaceName = "contosons.servicebus.windows.net";
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
// Replace <EVENT HUB NAME> with the name of your event hub.
// Example: private static final String eventHubName = "ordersehub";
private static final String eventHubName = "<EVENT HUB NAME>";
public static void main(String[] args) {
publishEvents();
}
/**
* Code sample for publishing events.
* @throws IllegalArgumentException if the EventData is bigger than the max batch size.
*/
public static void publishEvents() {
// create a token using the default Azure credential
DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
.authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
.build();
// create a producer client
EventHubProducerClient producer = new EventHubClientBuilder()
.fullyQualifiedNamespace(namespaceName)
.eventHubName(eventHubName)
.credential(credential)
.buildProducerClient();
// sample events in an array
List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
for (EventData eventData : allEvents) {
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
}
プログラムをビルドし、エラーがないことを確認します。 このプログラムは、受信側プログラムを実行した後に実行します。
受信イベント
このチュートリアルのコードは GitHub の EventProcessorClient サンプルに基づくものであり、完全に動作するアプリケーションを表示する場合に確認することができます。
チェックポイント ストアとして Azure Blob Storage を使用する場合は、次の推奨事項に従ってください。
- コンシューマー グループごとに個別のコンテナーを使用します。 同じストレージ アカウントを使用できますが、各グループごとに 1 つのコンテナーを使用します。
- コンテナーを他の何かに使用しないでください。また、ストレージ アカウントも他の何かに使用しないでください。
- ストレージ アカウントは、デプロイされたアプリケーションが配置されているのと同じリージョンに存在する必要があります。 アプリケーションがオンプレミスの場合は、可能な中で最も近いリージョンを選択することを試みてください。
Azure portal の [ストレージ アカウント] ページの [Blob service] セクションで、次の設定が無効になっていることを確認してください。
- 階層型名前空間
- BLOB の論理的な削除
- バージョン管理
Azure Storage と BLOB コンテナーを作成する
このクイックスタートでは、チェックポイント ストアとして Azure Storage (具体的には Blob Storage) を使用します。 チェックポイント処理とは、イベント プロセッサがパーティション内の最後に正常に処理されたイベントの位置をマークまたはコミットするために使用する処理です。 チェックポイントのマークは通常、イベントを処理する関数内で付けられます。 チェックポイント処理の詳細については、イベント プロセッサに関するページを参照してください。
Azure ストレージ アカウントを作成するには、次の手順に従います。
- Azure ストレージ アカウントの作成
- BLOB コンテナーを作成する
- BLOB コンテナーに対する認証
ローカルで開発する場合は、BLOB データにアクセスするユーザー アカウントに正しいアクセス許可があることを確認します。 BLOB データの読み取りと書き込みを行うには、ストレージ BLOB データ共同作成者が必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write アクションを含む別のロールに割り当てられている必要があります。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 ロールの割り当てに使用できるスコープの詳細は、スコープの概要ページでご覧いただけます。
このシナリオでは、最小限の特権の原則に従って、ストレージ アカウントに限定したアクセス許可をユーザー アカウントに割り当てます。 この方法を使って、ユーザーに必要最小限のアクセス許可のみを与え、より安全な運用環境を作成します。
次の例では、ストレージ BLOB データ共同作成者ロールを自分のユーザー アカウントに割り当てます。これにより、そのストレージ アカウント内の BLOB データに対する読み取りと書き込みの両方のアクセス権が付与されます。
重要
ほとんどの場合、ロールの割り当てが Azure に反映されるまでの時間は 1 分から 2 分ですが、まれに 8 分程度までかかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。
Azure portal で、メインの検索バーまたは左側のナビゲーションを使ってストレージ アカウントを見つけます。
ストレージ アカウントの概要ページで、左側のメニューから [アクセス制御 (IAM)] を選びます。
[アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。
上部のメニューから [+ 追加] を選択し、次に結果のドロップダウン メニューから [ロールの割り当ての追加] を選択します。
検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、ストレージ BLOB データ共同作成者を検索し、一致する結果を選び、[次へ] を選びます。
[アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。
ダイアログで、自分の Microsoft Entra ユーザー名 (通常は user@domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。
[レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。
Java プロジェクトに Event Hubs ライブラリを追加する
pom.xml ファイルに次の依存関係を追加します。
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.15.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
次の
import
ステートメントを Java ファイルの先頭に追加します。import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.*; import com.azure.storage.blob.*; import java.util.function.Consumer; import com.azure.identity.*;
Receiver
という名前のクラスを作成し、そのクラスに次の文字列変数を追加します。 プレースホルダーを適切な値に置き換えます。重要
プレースホルダーを適切な値に置き換えます。
<NAMESPACE NAME>
をご利用の Event Hubs 名前空間の名前に置換します。<EVENT HUB NAME>
を、名前空間内でご利用のイベント ハブの名前で置き換えます。
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net"; private static final String eventHubName = "<EVENT HUB NAME>";
クラスに次の
main
メソッドを追加します。重要
プレースホルダーを適切な値に置き換えます。
<STORAGE ACCOUNT NAME>
は、実際の Azure Storage アカウントの名前にします。<CONTAINER NAME>
を、ストレージ アカウントの BLOB コンテナーの名前にします。
// create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD) .build(); // Create a blob container client that you use later to build an event processor client to receive and process events BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .credential(credential) .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net") .containerName("<CONTAINER NAME>") .buildAsyncClient(); // Create an event processor client to receive and process events and errors. EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder() .fullyQualifiedNamespace(namespaceName) .eventHubName(eventHubName) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) .credential(credential) .buildEventProcessorClient(); System.out.println("Starting event processor"); eventProcessorClient.start(); System.out.println("Press enter to stop."); System.in.read(); System.out.println("Stopping event processor"); eventProcessorClient.stop(); System.out.println("Event processor stopped."); System.out.println("Exiting process");
イベントとエラーを処理する 2 つのヘルパー メソッド (
PARTITION_PROCESSOR
とERROR_HANDLER
) をReceiver
クラスに追加します。public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> { System.out.printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); };
プログラムをビルドし、エラーがないことを確認します。
アプリケーションの実行
まず、受信側アプリケーションを実行します。
次に、送信側アプリケーションを実行します。
受信側アプリケーション ウィンドウで、送信側アプリケーションによって発行されたイベントが表示されていることを確認します。
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar
アプリケーションを停止するには、受信側アプリケーション ウィンドウで Enter キーを押します。
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar Stopping event processor Event processor stopped. Exiting process
次のステップ
GitHub の次のサンプルを参照してください。