Spring Tümleştirmesi için Spring Cloud Azure desteği

Bu makale şunlar için geçerlidir:✅ Sürüm 4.19.0 ✅ Sürüm 5.19.0

Azure için Spring Tümleştirme Uzantısı, Javaiçin Azure SDK tarafından sağlanan çeşitli hizmetler için Spring Tümleştirme bağdaştırıcıları sağlar. Şu Azure hizmetleri için Spring Integration desteği sağlıyoruz: Event Hubs, Service Bus, Depolama Kuyruğu. Desteklenen bağdaştırıcıların listesi aşağıdadır:

  • spring-cloud-azure-starter-integration-eventhubs - Daha fazla bilgi için bkz. Azure Event Hubs ile Spring Tümleştirmesi
  • - Daha fazla bilgi için bkz. Azure Service Bus ile Spring Integration
  • spring-cloud-azure-starter-integration-storage-queue - Daha fazla bilgi için bkz. Azure Depolama Kuyruğu ile Spring Tümleştirmesi

Azure Event Hubs ile Spring Tümleştirmesi

Temel kavramlar

Azure Event Hubs, büyük bir veri akışı platformu ve olay alımı hizmetidir. Saniyede milyonlarca olay alabilir ve işleyebilir. Bir olay hub'ına gönderilen veriler, herhangi bir gerçek zamanlı analiz sağlayıcısı veya toplu işlem/depolama bağdaştırıcısı kullanılarak dönüştürülebilir ve depolanabilir.

Spring Integration, Spring tabanlı uygulamalarda basit mesajlaşmaya olanak tanır ve bildirim temelli bağdaştırıcılar aracılığıyla dış sistemlerle tümleştirmeyi destekler. Bu bağdaştırıcılar Spring'in uzaktan iletişim, mesajlaşma ve zamanlama desteği konusunda daha yüksek düzeyde soyutlama sağlar. Event Hubs için Spring Integration uzantısı projesi, Azure Event Hubs için gelen ve giden kanal bağdaştırıcıları ve ağ geçitleri sağlar.

Not

RxJava destek API'leri 4.0.0 sürümünden bırakılır. Ayrıntılar için bkz. Javadoc.

Tüketici grubu

Event Hubs, tüketici grubu için Apache Kafka ile benzer destek sağlar ancak biraz farklı mantık sunar. Kafka, işlenen tüm uzaklıkları aracıda depolasa da, el ile işlenen Event Hubs iletilerinin uzaklıklarını depolamanız gerekir. Event Hubs SDK'sı, bu tür uzaklıkları Azure Depolama'da depolama işlevi sağlar.

Bölümleme desteği

Event Hubs, Kafka ile benzer bir fiziksel bölüm kavramı sağlar. Ancak Kafka'nın tüketiciler ve bölümler arasında otomatik yeniden dengelemeden farklı olarak Event Hubs bir tür önleyici mod sağlar. Depolama hesabı, hangi bölümün hangi tüketiciye ait olduğunu belirlemek için kira görevi görür. Yeni bir tüketici başladığında, iş yükü dengelemeyi başarmak için çoğu ağır yüklü tüketiciden bazı bölümleri çalmaya çalışır.

Geliştiriciler, yük dengeleme stratejisini belirtmek için yapılandırma için EventHubsContainerProperties kullanabilir. yapılandırma örneği için aşağıdaki bölüm bakın.

Batch tüketici desteği

EventHubsInboundChannelAdapter toplu işlem modunu destekler. Kullanıcılar bunu etkinleştirmek için ListenerMode.BATCH örneği oluştururken dinleyici modunu EventHubsInboundChannelAdapter olarak belirtebilir. Etkinleştirildiğinde yükü toplu olayların listesi olan bir İletisi alınır ve aşağı akış kanalına geçirilir. Her ileti üst bilgisi, içeriği her olaydan ayrıştırılan ilişkili üst bilgi değeri olan bir liste olarak da dönüştürülür. Bölüm kimliği, denetim noktası oluşturucu ve son sıraya alınan özelliklerin ortak üst bilgileri için, olay grubunun tamamı için tek bir değer olarak sunulurlar ve aynı değeri paylaşırlar. Daha fazla bilgi için Event Hubs İleti Üst Bilgileri bölümüne bakın.

Not

Denetim noktası üst bilgisi yalnızca MANUAL denetim noktası modu kullanıldığında bulunur.

Toplu tüketici denetim noktası oluşturma iki modu destekler: BATCH ve MANUAL. BATCH modu, olay toplu işleminin tamamını alındıktan sonra birlikte kontrol etmek için otomatik bir denetim noktası modudur. MANUAL modu, kullanıcılara göre olayları kontrol etmektir. Kullanıldığında, Denetim Noktası Denetleyicisi ileti üst bilgisine geçirilir ve kullanıcılar bunu denetim noktası oluşturma amacıyla kullanabilir.

Toplu işlem kullanan ilke, max-size ve max-wait-timeözellikleri tarafından belirtilebilir; burada max-size gerekli bir özelliktir ve max-wait-time isteğe bağlıdır. Geliştiriciler toplu kullanım stratejisini belirtmek için yapılandırma için EventHubsContainerProperties kullanabilir. yapılandırma örneği için aşağıdaki bölüm bakın.

Bağımlılık kurulumu

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Konfigürasyon

Bu başlatıcı, yapılandırma seçeneklerinin aşağıdaki 3 bölümünü sağlar:

Bağlantı Yapılandırma Özellikleri

Bu bölüm, Azure Event Hubs'a bağlanmak için kullanılan yapılandırma seçeneklerini içerir.

Not

Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz güvenlik sorumlusuna Azure kaynağına erişmek için yeterli izin verildiğinden emin olmak için microsoft entra id erişimi yetkilendirme bölümüne bakın.

spring-cloud-azure-starter-integration-eventhubs'ın bağlantı yapılandırılabilir özellikleri:

Mülk Tür Açıklama
spring.cloud.azure.eventhubs.enabled Boolean Azure Event Hubs'ın etkinleştirilip etkinleştirilmediği.
spring.cloud.azure.eventhubs.connection-string Dizgi Event Hubs Ad Alanı bağlantı dizesi değeri.
spring.cloud.azure.eventhubs.namespace Dizgi FQDN'nin ön eki olan Event Hubs Ad Alanı değeri. FQDN, NamespaceName.DomainName dosyasından oluşmalıdır
spring.cloud.azure.eventhubs.domain-name Dizgi Azure Event Hubs Ad Alanı değerinin etki alanı adı.
spring.cloud.azure.eventhubs.custom-endpoint-address Dizgi Özel Uç Nokta adresi.
spring.cloud.azure.eventhubs.shared-connection Boolean Temel alınan EventProcessorClient ve EventHubProducerAsyncClient'ın aynı bağlantıyı kullanıp kullanmadığı. Varsayılan olarak, oluşturulan her Olay Hub'ı istemcisi için yeni bir bağlantı oluşturulur ve oluşturulur.

Denetim Noktası Yapılandırma Özellikleri

Bu bölüm, bölüm sahipliğini ve denetim noktası bilgilerini kalıcı hale etmek için kullanılan Depolama Blobları hizmetinin yapılandırma seçeneklerini içerir.

Not

sürüm 4.0.0'dan itibaren, spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists özelliği el ile etkinleştirilmediğinde depolama kapsayıcısı otomatik olarak oluşturulmaz.

Spring-cloud-azure-starter-integration-eventhubs'ın yapılandırılabilir özelliklerini denetleme:

Mülk Tür Açıklama
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolean Mevcut değilse kapsayıcı oluşturmaya izin verilip verilmeyeceği.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Dizgi Depolama hesabının adı.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Dizgi Depolama hesabı erişim anahtarı.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Dizgi Depolama kapsayıcısı adı.

Yaygın Azure Hizmeti SDK yapılandırma seçenekleri, Depolama Blobu denetim noktası deposu için de yapılandırılabilir. Desteklenen yapılandırma seçenekleri Spring Cloud Azure yapılandırmasunulmuştur ve birleştirilmiş ön ek spring.cloud.azure. veya spring.cloud.azure.eventhubs.processor.checkpoint-storeön eki ile yapılandırılabilir.

Event Hub işlemci yapılandırma özellikleri

EventHubsInboundChannelAdapter, EventProcessorClient kullanarak bir olay hub'ından gelen iletileri kullanarak bir EventProcessorClientgenel özelliklerini yapılandırabilir ve geliştiriciler yapılandırma için EventHubsContainerProperties kullanabilir. ile çalışma hakkında aşağıdaki bölüme bakın.

Temel kullanım

Azure Event Hubs'a ileti gönderme

  1. Kimlik bilgisi yapılandırma seçeneklerini doldurun.

    • Bağlantı dizesi olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Not

tenant-id için izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.

  1. Event Hubs'a ileti göndermek için DefaultMessageHandler çekirdeğiyle EventHubsTemplate oluşturun.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Bir ileti kanalı aracılığıyla yukarıdaki ileti işleyicisiyle bir ileti ağ geçidi bağlaması oluşturun.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Ağ geçidini kullanarak ileti gönderme.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Azure Event Hubs'dan ileti alma

  1. Kimlik bilgisi yapılandırma seçeneklerini doldurun.

  2. Giriş kanalı olarak bir ileti kanalı çekirdeği oluşturun.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Event Hubs'dan ileti almak için EventHubsInboundChannelAdapter çekirdeğiyle EventHubsMessageListenerContainer oluşturun.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Daha önce oluşturulan ileti kanalı aracılığıyla EventHubsInboundChannelAdapter ile bir ileti alıcısı bağlaması oluşturun.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

ObjectMapper'ı özelleştirmek için EventHubsMessageConverter'ı yapılandırma

EventHubsMessageConverter, kullanıcıların ObjectMapper'ı özelleştirmesine izin vermek için yapılandırılabilir bir fasulye olarak yapılır.

Batch tüketici desteği

Event Hubs'tan gelen iletileri toplu olarak kullanmak yukarıdaki örneğe benzerdir; ayrıca, kullanıcıların EventHubsInboundChannelAdapteriçin toplu işlem kullanan ilgili yapılandırma seçeneklerini ayarlaması gerekir.

EventHubsInboundChannelAdapteroluştururken dinleyici modu BATCHolarak ayarlanmalıdır. EventHubsMessageListenerContainerçekirdeği oluşturduğunuzda, denetim noktası modunu MANUAL veya BATCHolarak ayarlayın; toplu iş seçenekleri gerektiği gibi yapılandırılabilir.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Event Hubs ileti üst bilgileri

Aşağıdaki tabloda Event Hubs ileti özelliklerinin Spring ileti üst bilgileriyle nasıl eşlendiği gösterilmektedir. Azure Event Hubs için ileti eventolarak adlandırılır.

Kayıt Dinleyicisi Modunda Event Hubs İletisi / Olay Özellikleri ile Spring message üst bilgileri arasında eşleme:

Event Hubs Olay Özellikleri Spring Message Header Sabitleri Tür Açıklama
Sıralanan süre EventHubsHeaders#ENQUEUED_TIME Dakika Olayın Olay Hub'ı bölümünde ne zaman sıralandığına ilişkin utc olarak anlık değer.
Ofset EventHubsHeaders#OFFSET Uzun İlişkili Olay Hub'ı bölümünden alınan olayın uzaklığı.
Bölüm anahtarı AzureHeaders#PARTITION_KEY Dizgi Olay ilk yayımlandığında ayarlandıysa bölüm karma anahtarı.
Bölüm Kimliği AzureHeaders#RAW_PARTITION_ID Dizgi Olay Hub'ının bölüm kimliği.
Sıra numarası EventHubsHeaders#SEQUENCE_NUMBER Uzun İlişkili Olay Hub'ı bölümünde sıralandığında olaya atanan sıra numarası.
Son sıralanan olay özellikleri EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Bu bölümdeki son sıraya alınan olayın özellikleri.
NA AzureHeaders#CHECKPOINTER Denetim Noktası Oluşturucu Belirli bir iletinin denetim noktası üst bilgisi.

Kullanıcılar, her olayın ilgili bilgileri için ileti üst bilgilerini ayrıştırabilir. Olay için bir ileti üst bilgisi ayarlamak için, tüm özelleştirilmiş üst bilgiler bir olayın uygulama özelliği olarak konur ve burada üst bilgi özellik anahtarı olarak ayarlanır. Event Hubs'dan olaylar alındığında, tüm uygulama özellikleri ileti üst bilgisine dönüştürülür.

Not

Bölüm anahtarı, sıralanan süre, uzaklık ve sıra numarasının ileti üst bilgilerinin el ile ayarlanması desteklenmez.

Toplu iş tüketici modu etkinleştirildiğinde, toplu iletilerin belirli üst bilgileri, her bir Event Hubs olayından değerlerin listesini içeren aşağıdaki şekilde listelenir.

Toplu dinleyici modunda Event Hubs İletisi / Olay Özellikleri ile Spring message üst bilgileri arasında eşleme:

Event Hubs Olay Özellikleri Spring Batch İleti Üst Bilgi Sabitleri Tür Açıklama
Sıralanan süre EventHubsHeaders#ENQUEUED_TIME Anlık Liste Olay Hub'ı bölümünde her olayın ne zaman sıralandığının UTC olarak anlık listesi.
Ofset EventHubsHeaders#OFFSET Uzun Listesi İlişkili Olay Hub'ı bölümünden alınan her olayın uzaklığının listesi.
Bölüm anahtarı AzureHeaders#PARTITION_KEY Dize Listesi Her olayı özgün olarak yayımlarken ayarlandıysa bölüm karma anahtarının listesi.
Sıra numarası EventHubsHeaders#SEQUENCE_NUMBER Uzun Listesi İlişkili Olay Hub'ı bölümünde sıralandığında her olaya atanan sıra numarasının listesi.
Sistem özellikleri EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Harita Listesi Her olayın sistem özelliklerinin listesi.
Uygulama özellikleri EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Harita Listesi Tüm özelleştirilmiş ileti üst bilgilerinin veya olay özelliklerinin yerleştirildiği her olayın uygulama özelliklerinin listesi.

Not

İletiler yayımlandığında, varsa yukarıdaki toplu iş üst bilgilerinin tümü iletilerden kaldırılır.

Örnekleri

Daha fazla bilgi için GitHub'daki azure-spring-boot-samples deposuna bakın.

Azure Service Bus ile Spring Tümleştirmesi

Temel kavramlar

Spring Integration, Spring tabanlı uygulamalarda basit mesajlaşmaya olanak tanır ve bildirim temelli bağdaştırıcılar aracılığıyla dış sistemlerle tümleştirmeyi destekler.

Azure Service Bus için Spring Integration uzantısı projesi, Azure Service Bus için gelen ve giden kanal bağdaştırıcıları sağlar.

Not

CompletableFuture destek API'leri 2.10.0 sürümünden kullanım dışı bırakıldı ve 4.0.0 sürümünden Reactor Core ile değiştirildi. Ayrıntılar için bkz. Javadoc.

Bağımlılık kurulumu

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Konfigürasyon

Bu başlatıcı, yapılandırma seçeneklerinin aşağıdaki 2 bölümünü sağlar:

Bağlantı yapılandırma özellikleri

Bu bölüm, Azure Service Bus'a bağlanmak için kullanılan yapılandırma seçeneklerini içerir.

Not

Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz güvenlik sorumlusuna Azure kaynağına erişmek için yeterli izin verildiğinden emin olmak için microsoft entra id erişimi yetkilendirme bölümüne bakın.

spring-cloud-azure-starter-integration-servicebus'ın bağlantı yapılandırılabilir özellikleri:

Mülk Tür Açıklama
spring.cloud.azure.servicebus.enabled Boolean Azure Service Bus'ın etkinleştirilip etkinleştirilmediği.
spring.cloud.azure.servicebus.connection-string Dizgi Service Bus Ad Alanı bağlantı dizesi değeri.
spring.cloud.azure.servicebus.custom-endpoint-address Dizgi Service Bus'a bağlanırken kullanılacak özel uç nokta adresi.
spring.cloud.azure.servicebus.namespace Dizgi FQDN ön eki olan Service Bus Ad Alanı değeri. FQDN, NamespaceName.DomainName dosyasından oluşmalıdır
spring.cloud.azure.servicebus.domain-name Dizgi Azure Service Bus Ad Alanı değerinin etki alanı adı.

Service Bus işlemci yapılandırma özellikleri

ServiceBusInboundChannelAdapter, bir ServiceBusProcessorClientgenel özelliklerini yapılandırmak üzere iletileri kullanmak için ServiceBusProcessorClient kullanır. Geliştiriciler yapılandırma için ServiceBusContainerProperties kullanabilir. ile çalışma hakkında aşağıdaki bölüme bakın.

Temel kullanım

Azure Service Bus'a ileti gönderme

  1. Kimlik bilgisi yapılandırma seçeneklerini doldurun.

    • Bağlantı dizesi olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Not

tenant-id için izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.

  • Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Not

tenant-id için izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.

  1. Service Bus'a ileti göndermek için DefaultMessageHandler bean ile ServiceBusTemplate oluşturun, ServiceBusTemplate için varlık türünü ayarlayın. Bu örnekte örnek olarak Service Bus Kuyruğu alınıyor.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Bir ileti kanalı aracılığıyla yukarıdaki ileti işleyicisiyle bir ileti ağ geçidi bağlaması oluşturun.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Ağ geçidini kullanarak ileti gönderme.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Azure Service Bus'tan ileti alma

  1. Kimlik bilgisi yapılandırma seçeneklerini doldurun.

  2. Giriş kanalı olarak bir ileti kanalı çekirdeği oluşturun.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Service Bus'a ileti almak için ServiceBusInboundChannelAdapter çekirdeğiyle ServiceBusMessageListenerContainer oluşturun. Bu örnekte örnek olarak Service Bus Kuyruğu alınıyor.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Daha önce oluşturduğumuz ileti kanalı aracılığıyla ServiceBusInboundChannelAdapter ile bir ileti alıcı bağlaması oluşturun.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

ObjectMapper'ı özelleştirmek için ServiceBusMessageConverter'ı yapılandırma

ServiceBusMessageConverter, kullanıcıların ObjectMapperözelleştirmesine izin vermek için yapılandırılabilir bir fasulye olarak yapılır.

Service Bus ileti üst bilgileri

Birden çok Spring üst bilgisi sabitine eşlenebilen bazı Service Bus üst bilgileri için, farklı Spring üst bilgilerinin önceliği listelenir.

Service Bus Üst Bilgileri ile Spring Üst Bilgileri arasında eşleme:

Service Bus ileti üst bilgileri ve özellikleri Spring message header sabitleri Tür Yapılandırılabilir Açıklama
İçerik türü MessageHeaders#CONTENT_TYPE Dizgi Evet İletinin RFC2045 İçerik Türü tanımlayıcısı.
Bağıntı Kimliği ServiceBusMessageHeaders#CORRELATION_ID Dizgi Evet İletinin bağıntı kimliği
İleti Kimliği ServiceBusMessageHeaders#MESSAGE_ID Dizgi Evet İletinin ileti kimliği, bu üst bilgi MessageHeaders#ID'den daha yüksek önceliğe sahiptir.
İleti Kimliği MessageHeaders#ID UUID Evet İletinin ileti kimliği, bu üst bilgi ServiceBusMessageHeaders#MESSAGE_ID'dan daha düşük önceliğe sahiptir.
Bölüm anahtarı ServiceBusMessageHeaders#PARTITION_KEY Dizgi Evet İletiyi bölümlenmiş bir varlığa göndermek için bölüm anahtarı.
Yanıtla MessageHeaders#REPLY_CHANNEL Dizgi Evet Yanıtların gönderleneceği varlığın adresi.
Oturum kimliğini yanıtlama ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Dizgi Evet İletinin ReplyToGroupId özellik değeri.
Zamanlanan sıra saati utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Evet İletinin Service Bus'ta sıralanması gereken tarih saat, bu üst bilgi AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE'den daha yüksek önceliğe sahiptir.
Zamanlanan sıra saati utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Tam sayı Evet Service Bus'ta iletinin sıraya alınması gereken tarih saat, bu üst bilgi ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME'den daha düşük önceliğe sahiptir.
Oturum Kimliği ServiceBusMessageHeaders#SESSION_ID Dizgi Evet Oturum kullanan bir varlık için oturum IDentifier.
Yaşam süresi ServiceBusMessageHeaders#TIME_TO_LIVE Süre Evet Bu iletinin süresi dolmadan önceki süre.
Hedef ServiceBusMessageHeaders#TO Dizgi Evet yönlendirme senaryolarında gelecekte kullanılmak üzere ayrılmış ve aracı tarafından şu anda yoksayılan iletinin "to" adresi.
Konu ServiceBusMessageHeaders#SUBJECT Dizgi Evet İletinin konusu.
Teslim edilemeyen harf hatası açıklaması ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Dizgi Hayır Teslim edilemeyen iletinin açıklaması.
Teslim edilemeyen mektup nedeni ServiceBusMessageHeaders#DEAD_LETTER_REASON Dizgi Hayır Bir iletinin yazılmama nedeni.
Teslim edilemeyen harf kaynağı ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Dizgi Hayır İletinin teslim edilmediği varlık.
Teslim sayısı ServiceBusMessageHeaders#DELIVERY_COUNT uzun Hayır Bu iletinin istemcilere teslim etme sayısı.
Sıralanmış sıra numarası ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER uzun Hayır Service Bus tarafından bir iletiye atanan sıralı sıra numarası.
Sıralanan süre ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Hayır Bu iletinin Service Bus'ta sıralandığı tarih saat.
Son kullanma tarihi: ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Hayır Bu iletinin süresinin dolacağı tarih saat.
Kilit belirteci ServiceBusMessageHeaders#LOCK_TOKEN Dizgi Hayır Geçerli ileti için kilit belirteci.
Kilitlenene kadar ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Hayır Bu iletinin kilidinin süresinin dolmasına neden olan tarih saat.
Sıra numarası ServiceBusMessageHeaders#SEQUENCE_NUMBER uzun Hayır Service Bus tarafından bir iletiye atanan benzersiz numara.
Devlet ServiceBusMessageHeaders#STATE ServiceBusMessageState Hayır Etkin, Ertelenmiş veya Zamanlanmış olabilecek iletinin durumu.

Bölüm anahtarı desteği

Bu başlatıcı, ileti üst bilgisinde bölüm anahtarı ve oturum kimliği ayarlamaya izin vererek service bus bölümleme destekler. Bu bölümde, iletiler için bölüm anahtarının nasıl ayarlanacağı anlatlenmektedir.

Önerilen: üst bilgi anahtarı olarak ServiceBusMessageHeaders.PARTITION_KEY kullanın.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Önerilmez ancak şu anda desteklenir: üst bilginin anahtarı olarak AzureHeaders.PARTITION_KEY.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Not

İleti üst bilgilerinde hem ServiceBusMessageHeaders.PARTITION_KEY hem de AzureHeaders.PARTITION_KEY ayarlandığında, ServiceBusMessageHeaders.PARTITION_KEY tercih edilir.

Oturum desteği

Bu örnekte, uygulamadaki bir iletinin oturum kimliğinin el ile nasıl ayarlanacağı gösterilmektedir.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Not

İleti üst bilgilerinde ServiceBusMessageHeaders.SESSION_ID ayarlandığında ve farklı bir ServiceBusMessageHeaders.PARTITION_KEY üst bilgisi de ayarlandığında, bölüm anahtarının değerinin üzerine yazmak için oturum kimliğinin değeri sonunda kullanılır.

Service Bus istemci özelliklerini özelleştirme

Geliştiriciler Service Bus İstemcisi özelliklerini özelleştirmek için AzureServiceClientBuilderCustomizer kullanabilir. Aşağıdaki örnek, sessionIdleTimeoutiçindeki ServiceBusClientBuilder özelliğini özelleştirmektedir:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Örnekleri

Daha fazla bilgi için GitHub'daki azure-spring-boot-samples deposuna bakın.

Azure Depolama Kuyruğu ile Spring Tümleştirmesi

Temel kavramlar

Azure Kuyruk Depolama, çok sayıda iletiyi depolamaya yönelik bir hizmettir. HTTP veya HTTPS kullanarak kimliği doğrulanmış çağrılar aracılığıyla dünyanın herhangi bir yerinden iletilere erişebilirsiniz. Kuyruk iletisinin boyutu en fazla 64 KB olabilir. Kuyruk, depolama hesabının toplam kapasite sınırına kadar milyonlarca ileti içerebilir. Kuyruklar genellikle zaman uyumsuz olarak işlenmek üzere bir iş kapsamı oluşturmak için kullanılır.

Bağımlılık kurulumu

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Konfigürasyon

Bu başlatıcı aşağıdaki yapılandırma seçeneklerini sağlar:

Bağlantı yapılandırma özellikleri

Bu bölüm, Azure Depolama Kuyruğu'na bağlanmak için kullanılan yapılandırma seçeneklerini içerir.

Not

Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz güvenlik sorumlusuna Azure kaynağına erişmek için yeterli izin verildiğinden emin olmak için microsoft entra id erişimi yetkilendirme bölümüne bakın.

spring-cloud-azure-starter-integration-storage-queue'un bağlantı yapılandırılabilir özellikleri:

Mülk Tür Açıklama
spring.cloud.azure.storage.queue.enabled Boolean Azure Depolama Kuyruğu'nın etkinleştirilip etkinleştirilmediği.
spring.cloud.azure.storage.queue.connection-string Dizgi Depolama Kuyruğu Ad Alanı bağlantı dizesi değeri.
spring.cloud.azure.storage.queue.accountName Dizgi Depolama Kuyruğu hesap adı.
spring.cloud.azure.storage.queue.accountKey Dizgi Depolama Kuyruğu hesap anahtarı.
spring.cloud.azure.storage.queue.endpoint Dizgi Depolama Kuyruğu hizmet uç noktası.
spring.cloud.azure.storage.queue.sasToken Dizgi Sas belirteci kimlik bilgileri
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion API istekleri yapılırken kullanılan QueueServiceVersion.
spring.cloud.azure.storage.queue.messageEncoding Dizgi Kuyruk iletisi kodlama.

Temel kullanım

Azure Depolama Kuyruğuna ileti gönderme

  1. Kimlik bilgisi yapılandırma seçeneklerini doldurun.

    • Bağlantı dizesi olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      
    • Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

Not

tenant-id için izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.

  • Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

Not

tenant-id için izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.

  1. Depolama Kuyruğuna ileti göndermek için DefaultMessageHandler çekirdeğiyle StorageQueueTemplate oluşturun.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Bir ileti kanalı aracılığıyla yukarıdaki ileti işleyicisiyle bir İleti ağ geçidi bağlaması oluşturun.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Ağ geçidini kullanarak ileti gönderme.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Azure Depolama Kuyruğu'ndan ileti alma

  1. Kimlik bilgisi yapılandırma seçeneklerini doldurun.

  2. Giriş kanalı olarak bir ileti kanalı çekirdeği oluşturun.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. depolama kuyruğuna ileti almak için StorageQueueMessageSource bean ile StorageQueueTemplate oluşturun.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Daha önce oluşturduğumuz ileti kanalı aracılığıyla son adımda oluşturulan StorageQueueMessageSource ile bir ileti alıcısı bağlaması oluşturun.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Örnekleri

Daha fazla bilgi için GitHub'daki azure-spring-boot-samples deposuna bakın.