Kafka API 用の Azure Event Hubs で Spring Kafka を使用する
このチュートリアルでは、Azure Event Hubs でメッセージを送受信するために Kafka 用 Azure Event Hubs を使用するように Java ベースの Spring Cloud Stream Binder を構成する方法について説明します。 詳細については、「Apache Kafka アプリケーションから Azure Event Hubs を使用する」を参照してください。
このチュートリアルでは、Microsoft Entra 認証と Shared Access Signature (SAS) 認証の 2 つの認証が含まれます。 [パスワードレス] タブには Microsoft Entra 認証が表示され、[接続文字列] タブには SAS 認証が表示されます。
Microsoft Entra 認証は、Microsoft Entra ID で定義された ID を使用して、Kafka 用 Azure Event Hubs に接続するメカニズムです。 Microsoft Entra 認証を使用すると、データベース ユーザーの ID や他の Microsoft サービスを一元管理でき、アクセス許可の管理が容易になります。
SAS 認証では、Kafka 用 Event Hubs への委任されたアクセスに Azure Event Hubs 名前空間の接続文字列が使用されます。 Shared Access Signature を資格情報として使用する場合は、接続文字列を自分で管理する必要があります。
前提条件
Azure サブスクリプション - 無料アカウントを作成します。
Java Development Kit (JDK) バージョン 8 以降。
Apache Maven、バージョン 3.2 以降。
cURL または機能をテストするための類似の HTTP ユーティリティ。
Azure Cloud Shell または Azure CLI 2.37.0 以降。
Azure Evet Hub。 ない場合は、Azure Portal を使って Event Hub を作ることができます。
Spring Boot アプリケーション。 ない場合は、Spring Initializr で Maven プロジェクトを作成します。 必ず [Mavenプロジェクト] を選択し、[依存関係] で [Spring Web]、[Spring for Apache Kafka]、および [Cloud Stream] の依存関係を追加して から、Java バージョン 8 以降を選択してください。
重要
このチュートリアルの手順を完了するには、Spring Boot 2.5 以上のバージョンが必要です。
資格情報を準備する
Azure Event Hubs では、Microsoft Entra ID を使用した Event Hubs リソースへの要求の承認がサポートされています。 Microsoft Entra ID では、Azure ロールベースのアクセス制御 (Azure RBAC) を使用して、セキュリティ プリンシパルへのアクセスを許可します (これは、ユーザーまたはアプリケーション サービス プリンシパルである場合があります)。
このサンプルを Microsoft Entra 認証でローカルに実行する場合は、Azure Toolkit for IntelliJ、Visual Studio Code Azure Account プラグインまたは Azure CLI でユーザー アカウントが認証されていることを確認します。 また、アカウントに十分なアクセス許可が付与されていることを確認します。
Note
パスワードレス接続を使用する場合は、アカウントにリソースへのアクセス権を付与する必要があります。 Azure Event Hubs で、現在使用している Microsoft Entra アカウントに Azure Event Hubs Data Receiver
ロールと Azure Event Hubs Data Sender
ロールを割り当てます。 アクセス ロールの付与の詳細については、「Azure Portal を使用して Azure ロールを割り当てる」および「Microsoft Entra ID を使用して Event Hubs リソースへのアクセスを承認する」を参照してください。
Azure Event Hubs からのメッセージの送受信
Azure Event Hubs を使用すると、Spring Cloud Azure を使用してメッセージを送受信できます。
Spring Cloud Azure Starter モジュールをインストールするには、次の依存関係を pom.xml ファイルに追加します。
Spring Cloud Azure 部品表 (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.18.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Note
Spring Boot 2.xを使用している場合は、
spring-cloud-azure-dependencies
バージョンを4.19.0
に設定してください。 この部品表(BOM)は、 pom.xml ファイルの<dependencyManagement>
セクションで設定する必要があります。 これにより、すべてのSpring Cloud Azure依存関係が同じバージョンを使用していることが保証されます。 このBOMに使用されるバージョンの詳細については、「Spring Cloud Azureのどのバージョンを使うべきか」を参照してください。Spring Cloud Azure Starter アーティファクト:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency>
アプリケーションをコーディングする
次の手順を実行して、Azure Event Hubs を使用してメッセージを生成および使用するようにアプリケーションを構成します。
次のプロパティを application.properties ファイルに追加して、Event Hubs の資格情報を構成します。
spring.cloud.stream.kafka.binder.brokers=${AZ_EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net:9093 spring.cloud.function.definition=consume;supply spring.cloud.stream.bindings.consume-in-0.destination=${AZ_EVENTHUB_NAME} spring.cloud.stream.bindings.consume-in-0.group=$Default spring.cloud.stream.bindings.supply-out-0.destination=${AZ_EVENTHUB_NAME}
ヒント
バージョン
spring-cloud-azure-dependencies:4.3.0
を使用している場合は、値com.azure.spring.cloud.autoconfigure.kafka.AzureKafkaSpringCloudStreamConfiguration
を持つプロパティspring.cloud.stream.binders.<kafka-binder-name>.environment.spring.main.sources
を追加する必要があります。4.4.0
のため、このプロパティは、自動追加されるので、手動で追加する必要はありません。次の表では、構成のフィールドについて説明します。
フィールド 説明 spring.cloud.stream.kafka.binder.brokers
Azure Event Hubs エンドポイントを指定します。 spring.cloud.stream.bindings.consume-in-0.destination
入力先のイベント ハブを指定します。ここでは、このチュートリアルで作成したハブを指定します。 spring.cloud.stream.bindings.consume-in-0.group
Azure Event Hubs のコンシューマー グループを指定します。Azure Event Hubs インスタンスの作成時に作成された基本コンシューマー グループを使用するには、 $Default
に設定します。spring.cloud.stream.bindings.supply-out-0.destination
出力先のイベント ハブを指定します。ここでは、入力先と同じものになります。 Note
トピックの自動作成を有効にする場合は、構成項目
spring.cloud.stream.kafka.binder.replicationFactor
を追加し、値を 1 以上に設定します。 詳細については、「Spring Cloud Stream Kafka Binder Reference Guide」を参照してください。スタートアップ クラス ファイルを編集して、次の内容を表示します。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; @SpringBootApplication public class EventHubKafkaBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubKafkaBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(EventHubKafkaBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->LOGGER.info("New message received: '{}'", message.getPayload()); } @Override public void run(String... args) { many.emitNext(new GenericMessage<>("Hello World"), Sinks.EmitFailureHandler.FAIL_FAST); } }
ヒント
このチュートリアルでは、構成またはコードに認証操作はありません。 ただし、Azure サービスに接続するには認証が必要です。 認証を完了するには、Azure ID を使用する必要があります。 Spring Cloud Azure では、
DefaultAzureCredential
を使用します。これは、コードを変更せずに資格情報を取得できるようにするために、Azure ID ライブラリで提供されます。DefaultAzureCredential
は複数の認証方法をサポートしており、実行時に使用する方法が決定されます。 このアプローチを採用すると、環境固有のコードを実装することなく、異なる環境 (ローカルと運用環境など) で異なる認証方法をアプリに使用できます。 詳細については、DefaultAzureCredential を参照してください。ローカル開発環境で認証を完了するには、Azure CLI、Visual Studio Code、PowerShell、またはその他の方法を使用できます。 詳細については、「Java 開発環境での Azure 認証」を参照してください。 Azure ホスティング環境で認証を完了するには、ユーザー割り当てマネージド ID を使用することをお勧めします。 詳細については、「Azure リソースのマネージド ID とは」を参照してください。
アプリケーションを起動します。 次の例のようなメッセージは、アプリケーション ログに投稿されます。
Kafka version: 3.0.1 Kafka commitId: 62abe01bee039651 Kafka startTimeMs: 1622616433956 New message received: 'Hello World'
Azure Spring Apps にデプロイする
Spring Boot アプリケーションがローカルで実行されたので、運用環境に移行します。 Azure Spring Apps では、コードを変更せずに、Spring Boot アプリケーションを Azure に簡単にデプロイできます。 Spring アプリケーションのインフラストラクチャはこのサービスによって管理されるため、開発者はコードに専念できます。 Azure Spring Apps では、包括的な監視と診断、構成管理、サービス検出、CI/CD 統合、ブルー/グリーン デプロイなどを使用して、ライフサイクルを管理できます。 Azure Spring Apps にアプリケーションをデプロイするには、「初めてのアプリケーションを Azure Spring Apps にデプロイする」を参照してください。