Azure Event Hubs ile Spring Cloud Stream

Bu öğreticide, bir Spring Boot uygulamasında Azure Event Hubs ve Spring Cloud Stream Binder Eventhubs kullanarak ileti gönderme ve alma işlemleri gösterilmektedir.

Önkoşullar

  • Azure aboneliği - ücretsiz bir abonelik oluşturun.

  • Java Development Kit (JDK) sürüm 8 veya üzeri.

  • Apache Maven, sürüm 3.2 veya üzeri.

  • cURL veya işlevselliği test etmek için benzer bir HTTP yardımcı programı.

  • Azure Olay Hub'ı. Yoksa Azure portalını kullanarak bir olay hub'ı oluşturun.

  • Olay hub'ı denetim noktaları için Azure Depolama Hesabı. Hesabınız yoksa bir depolama hesabı oluşturun.

  • Spring Boot uygulaması. Yoksa Spring Initializr ile bir Maven projesi oluşturun. Maven Projesi'ni seçtiğinizden emin olun ve Bağımlılıklar'ın altında Spring Web ve Azure Desteği bağımlılıklarını ekleyin, ardından Java sürüm 8 veya üzerini seçin.

Not

Hesabınıza kaynaklara erişim vermek için Azure Event Hubs'da ve Azure Event Hubs Data Sender rolünü kullanmakta olduğunuz Microsoft Entra hesabına atayınAzure Event Hubs Data Receiver. Ardından Azure Depolama hesabında, rolü şu anda kullanmakta Storage Blob Data Contributor olduğunuz Microsoft Entra hesabına atayın. Erişim rolleri verme hakkında daha fazla bilgi için bkz . Azure portalını kullanarak Azure rolleri atama ve Microsoft Entra Id kullanarak Event Hubs kaynaklarına erişim yetkilendirme.

Önemli

Bu öğreticideki adımları tamamlamak için Spring Boot sürüm 2.5 veya üzeri gereklidir.

Azure Event Hubs'dan ileti gönderme ve alma

Azure Depolama Hesabı ve Azure Olay hub'ı ile Spring Cloud Azure Stream Binder Event Hubs kullanarak ileti gönderip alabilirsiniz.

Spring Cloud Azure Stream Binder Event Hubs modülünü yüklemek için pom.xml dosyanıza aşağıdaki bağımlılıkları ekleyin:

  • Spring Cloud Azure Ürün Reçetesi (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.18.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Not

    Spring Boot 2.x kullanıyorsanız, sürümünü olarak 4.19.0ayarladığınızdan spring-cloud-azure-dependencies emin olun. Bu Ürün Reçetesi (BOM), pom.xml dosyanızın bölümünde yapılandırılmalıdır<dependencyManagement>. Bu, tüm Spring Cloud Azure bağımlılıklarının aynı sürümü kullanmasını sağlar. Bu ürün reçetesi için kullanılan sürüm hakkında daha fazla bilgi için bkz . Spring Cloud Azure'ın Hangi Sürümünü Kullanmalıyım.

  • Spring Cloud Azure Stream Binder Event Hubs yapıtı:

    <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
    </dependency>
    

Uygulamayı kodlama

Uygulamanızı Azure Event Hubs kullanarak ileti üretecek ve kullanacak şekilde yapılandırmak için aşağıdaki adımları kullanın.

  1. Application.properties dosyanıza aşağıdaki özellikleri ekleyerek Olay hub'ı kimlik bilgilerini yapılandırın.

     spring.cloud.azure.eventhubs.namespace=${AZURE_EVENTHUBS_NAMESPACE}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=${AZURE_STORAGE_ACCOUNT_NAME}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=${AZURE_STORAGE_CONTAINER_NAME}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.bindings.consume-in-0.group=${AZURE_EVENTHUB_CONSUMER_GROUP}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.initial-delay=0
     spring.cloud.stream.poller.fixed-delay=1000
    

    Aşağıdaki tabloda yapılandırmadaki alanlar açıklanmaktadır:

    Alan Açıklama
    spring.cloud.azure.eventhubs.namespace Azure portalından olay hub'ınızda aldığınız ad alanını belirtin.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Bu öğreticide oluşturduğunuz depolama hesabını belirtin.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Depolama hesabınızın kapsayıcısını belirtin.
    spring.cloud.stream.bindings.consume-in-0.destination Bu öğreticide kullandığınız olay hub'ını belirtin.
    spring.cloud.stream.bindings.consume-in-0.group Event Hubs Örneğinizdeki Tüketici gruplarını belirtin.
    spring.cloud.stream.bindings.supply-out-0.destination Bu öğreticide kullandığınız olay hub'ını belirtin.
    spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode MANUAL belirtin.
    spring.cloud.function.definition Bağlamalar tarafından kullanıma sunulan dış hedeflere hangi işlevsel çekirdeğin bağlanacağını belirtin.
    spring.cloud.stream.poller.initial-delay Düzenli tetikleyiciler için ilk gecikmeyi belirtin. Varsayılan değer 0'dır.
    spring.cloud.stream.poller.fixed-delay Varsayılan poller için sabit gecikmeyi milisaniye cinsinden belirtin. Varsayılan değer 1000 L'dir.
  2. Aşağıdaki içeriği göstermek için başlangıç sınıfı dosyasını düzenleyin.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class EventHubBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued "
                        +"time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
                );
                checkpointer.success()
                            .doOnSuccess(success->LOGGER.info("Message '{}' successfully checkpointed",
                                message.getPayload()))
                            .doOnError(error->LOGGER.error("Exception found", error))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to sendMessage.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    İpucu

    Bu öğreticide yapılandırmalarda veya kodda kimlik doğrulama işlemi yoktur. Ancak Azure hizmetlerine bağlanmak için kimlik doğrulaması gerekir. Kimlik doğrulamasını tamamlamak için Azure Identity kullanmanız gerekir. Spring Cloud Azure, azure kimlik kitaplığının herhangi bir kod değişikliği yapmadan kimlik bilgilerini almanıza yardımcı olmak için sağladığı öğesini kullanır DefaultAzureCredential.

    DefaultAzureCredential birden çok kimlik doğrulama yöntemini destekler ve çalışma zamanında hangi yöntemin kullanılacağını belirler. Bu yaklaşım, uygulamanızın ortama özgü kod uygulamadan farklı ortamlarda (yerel ve üretim ortamları gibi) farklı kimlik doğrulama yöntemleri kullanmasını sağlar. Daha fazla bilgi için bkz . DefaultAzureCredential.

    Yerel geliştirme ortamlarında kimlik doğrulamasını tamamlamak için Azure CLI, Visual Studio Code, PowerShell veya diğer yöntemleri kullanabilirsiniz. Daha fazla bilgi için bkz . Java geliştirme ortamlarında Azure kimlik doğrulaması. Azure barındırma ortamlarında kimlik doğrulamasını tamamlamak için kullanıcı tarafından atanan yönetilen kimliği kullanmanızı öneririz. Daha fazla bilgi için bkz. Azure kaynakları için yönetilen kimlikler nelerdir?

  3. Uygulamayı başlatın. Aşağıdaki örnek çıktıda gösterildiği gibi, bunun gibi iletiler uygulama günlüğünüzde yayımlanır:

    New message received: 'Hello World', partition key: 107207233, sequence number: 458, offset: 94256, enqueued time: 2023-02-17T08:27:59.641Z
    Message 'Hello World!' successfully checkpointed
    

Azure Spring Apps'e dağıtma

Spring Boot uygulamasını yerel olarak çalıştırdığınıza göre artık uygulamayı üretim ortamına taşımanın zamanı geldi. Azure Spring Apps , kod değişikliği yapmadan Spring Boot uygulamalarını Azure'a dağıtmayı kolaylaştırır. Hizmet, geliştiricilerin kodlarına odaklanabilmesi için Spring uygulamalarının altyapısını yönetir. Azure Spring Apps kapsamlı izleme ve tanılama, yapılandırma yönetimi, hizmet bulma, CI/CD tümleştirmesi, mavi-yeşil dağıtımlar ve daha fazlasını kullanarak yaşam döngüsü yönetimi sağlar. Uygulamanızı Azure Spring Apps'e dağıtmak için bkz . İlk uygulamanızı Azure Spring Apps'e dağıtma.

Sonraki adımlar