Azure Event Hubs を使用した Spring Cloud Stream

このチュートリアルでは、Spring Boot アプリケーションで Azure Event Hubs と Spring Cloud Stream Binder Eventhubs を使用してメッセージを送受信する方法について説明します。

前提条件

Note

アカウントにリソースへのアクセス権を付与するには、Azure Event Hubs で、現在使用している Microsoft Entra アカウントに Azure Event Hubs Data ReceiverAzure Event Hubs Data Sender のロールを割り当てます。 次に、Azure Storage アカウントで、現在使用している Microsoft Entra アカウントに Storage Blob Data Contributor ロールを割り当てます。 アクセス ロールの付与の詳細については、「Azure Portal を使用して Azure ロールを割り当てる」および「Microsoft Entra ID を使用して Event Hubs リソースへのアクセスを承認する」を参照してください。

重要

このチュートリアルの手順を完了するには、Spring Boot 2.5 以上のバージョンが必要です。

Azure Event Hubs からのメッセージの送受信

Azure Storage アカウントと Azure Event Hubs を使用すると、Spring Cloud Azure Stream Binder Event Hubs を使用してメッセージを送受信できます。

Spring Cloud Azure Stream Binder Event Hubs モジュールをインストールするには、次の依存関係を pom.xml ファイルに追加します。

  • Spring Cloud Azure 部品表 (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.17.1</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Note

    Spring Boot 2.xを使用している場合は、 spring-cloud-azure-dependencies バージョンを 4.19.0に設定してください。 この部品表(BOM)は、 pom.xml ファイルの <dependencyManagement> セクションで設定する必要があります。 これにより、すべてのSpring Cloud Azure依存関係が同じバージョンを使用していることが保証されます。 このBOMに使用されるバージョンの詳細については、「Spring Cloud Azureのどのバージョンを使うべきか」を参照してください。

  • Spring Cloud Azure Stream Binder Event Hubs アーティファクト:

    <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
    </dependency>
    

アプリケーションをコーディングする

次の手順を実行して、Azure Event Hubs を使用してメッセージを生成および使用するようにアプリケーションを構成します。

  1. 次のプロパティを application.properties ファイルに追加して、Event Hubs の資格情報を構成します。

     spring.cloud.azure.eventhubs.namespace=${AZURE_EVENTHUBS_NAMESPACE}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=${AZURE_STORAGE_ACCOUNT_NAME}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=${AZURE_STORAGE_CONTAINER_NAME}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.bindings.consume-in-0.group=${AZURE_EVENTHUB_CONSUMER_GROUP}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.initial-delay=0
     spring.cloud.stream.poller.fixed-delay=1000
    

    次の表では、構成のフィールドについて説明します。

    フィールド 説明
    spring.cloud.azure.eventhubs.namespace Azure Portal の自分のイベント ハブで取得した名前空間を指定します。
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name このチュートリアルで作成したストレージ アカウントを指定します。
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name ストレージ アカウントのコンテナーを指定します。
    spring.cloud.stream.bindings.consume-in-0.destination このチュートリアルで使用したイベント ハブを指定します。
    spring.cloud.stream.bindings.consume-in-0.group Event Hubs インスタンスのコンシューマー グループを指定します。
    spring.cloud.stream.bindings.supply-out-0.destination このチュートリアルで使用したのと同じイベント ハブを指定します。
    spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode MANUAL を指定します。
    spring.cloud.function.definition バインドによって公開されている外部送信先にバインドする、機能 Bean を指定します。
    spring.cloud.stream.poller.initial-delay 定期的なトリガーの初期遅延を指定します。 既定値は0です。
    spring.cloud.stream.poller.fixed-delay デフォルトのポーラーの固定遅延をミリ秒単位で指定します。 既定値は 1000 L です。
  2. スタートアップ クラス ファイルを編集して、次の内容を表示します。

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class EventHubBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued "
                        +"time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
                );
                checkpointer.success()
                            .doOnSuccess(success->LOGGER.info("Message '{}' successfully checkpointed",
                                message.getPayload()))
                            .doOnError(error->LOGGER.error("Exception found", error))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to sendMessage.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    ヒント

    このチュートリアルでは、構成またはコードに認証操作はありません。 ただし、Azure サービスに接続するには認証が必要です。 認証を完了するには、Azure ID を使用する必要があります。 Spring Cloud Azure では、DefaultAzureCredential を使用します。これは、コードを変更せずに資格情報を取得できるようにするために、Azure ID ライブラリで提供されます。

    DefaultAzureCredential は複数の認証方法をサポートしており、実行時に使用する方法が決定されます。 このアプローチを採用すると、環境固有のコードを実装することなく、異なる環境 (ローカルと運用環境など) で異なる認証方法をアプリに使用できます。 詳細については、DefaultAzureCredential を参照してください。

    ローカル開発環境で認証を完了するには、Azure CLI、Visual Studio Code、PowerShell、またはその他の方法を使用できます。 詳細については、「Java 開発環境での Azure 認証」を参照してください。 Azure ホスティング環境で認証を完了するには、ユーザー割り当てマネージド ID を使用することをお勧めします。 詳細については、「Azure リソースのマネージド ID とは」を参照してください。

  3. アプリケーションを起動します。 このようなメッセージは、次の出力例に示すように、アプリケーション ログに投稿されます。

    New message received: 'Hello World', partition key: 107207233, sequence number: 458, offset: 94256, enqueued time: 2023-02-17T08:27:59.641Z
    Message 'Hello World!' successfully checkpointed
    

Azure Spring Apps にデプロイする

Spring Boot アプリケーションがローカルで実行されたので、運用環境に移行します。 Azure Spring Apps では、コードを変更せずに、Spring Boot アプリケーションを Azure に簡単にデプロイできます。 Spring アプリケーションのインフラストラクチャはこのサービスによって管理されるため、開発者はコードに専念できます。 Azure Spring Apps では、包括的な監視と診断、構成管理、サービス検出、CI/CD 統合、ブルー/グリーン デプロイなどを使用して、ライフサイクルを管理できます。 Azure Spring Apps にアプリケーションをデプロイするには、「初めてのアプリケーションを Azure Spring Apps にデプロイする」を参照してください。

次のステップ