Spring Integration için Spring Cloud Azure desteği

Bu makale şunlar için geçerlidir: ✔️ Sürüm 4.14.0 ✔️ Sürüm 5.8.0

Azure için Spring Integration Uzantısı, Java için Azure SDK tarafından sağlanan çeşitli hizmetler için Spring Integration bağdaştırıcıları sağlar. Şu Azure hizmetleri için Spring Integration desteği sunuyoruz: Event Hubs, Service Bus, Depolama Kuyruğu. Desteklenen bağdaştırıcıların listesi aşağıdadır:

Azure Event Hubs ile Spring Tümleştirmesi

Temel kavramlar

Azure Event Hubs, büyük bir veri akışı platformu ve olay alımı hizmetidir. Saniyede milyonlarca olayı alabilir ve işleyebilir. Bir olay hub’ına gönderilen veriler, herhangi bir gerçek zamanlı analiz sağlayıcısı ve işlem grubu oluşturma/depolama bağdaştırıcıları kullanılarak dönüştürülüp depolanabilir.

Spring Integration, Spring tabanlı uygulamalarda basit mesajlaşmaya olanak tanır ve bildirim temelli bağdaştırıcılar aracılığıyla dış sistemlerle tümleştirmeyi destekler. Bu bağdaştırıcılar Spring'in uzaktan iletişim, mesajlaşma ve zamanlama desteği konusunda daha yüksek düzeyde soyutlama sağlar. Event Hubs için Spring Tümleştirmesi uzantısı projesi, Azure Event Hubs için gelen ve giden kanal bağdaştırıcıları ve ağ geçitleri sağlar.

Dekont

RxJava destek API'leri 4.0.0 sürümünden bırakılır. Ayrıntılar için bkz. Javadoc.

Tüketici grubu

Event Hubs, tüketici grubu için Apache Kafka ile benzer destek sağlar ancak biraz farklı mantık sunar. Kafka, işlenen tüm uzaklıkları aracıda depolasa da, el ile işlenen Event Hubs iletilerinin uzaklıklarını depolamanız gerekir. Event Hubs SDK'sı, bu tür uzaklıkları Azure Depolama içinde depolama işlevi sağlar.

Bölümleme desteği

Event Hubs, Kafka ile benzer bir fiziksel bölüm kavramı sağlar. Ancak Kafka'nın tüketiciler ve bölümler arasında otomatik yeniden dengelemeden farklı olarak Event Hubs bir tür önleyici mod sağlar. Depolama hesabı, hangi bölümün hangi tüketiciye ait olduğunu belirlemek için kira görevi görür. Yeni bir tüketici başladığında, iş yükü dengelemeyi başarmak için çoğu ağır yüklü tüketiciden bazı bölümleri çalmaya çalışır.

Geliştiriciler, yük dengeleme stratejisini belirtmek için yapılandırma için kullanabilir EventHubsContainerProperties . yapılandırma EventHubsContainerPropertiesörneği için aşağıdaki bölüme bakın.

Batch tüketici desteği

, EventHubsInboundChannelAdapter toplu işlem kullanma modunu destekler. Bunu etkinleştirmek için kullanıcılar bir örneği oluştururken EventHubsInboundChannelAdapter olduğu gibi ListenerMode.BATCH dinleyici modunu belirtebilir. Etkinleştirildiğinde, yükü toplu olayların listesi olan bir İleti alınır ve aşağı akış kanalına geçirilir. Her ileti üst bilgisi, içeriği her olaydan ayrıştırılan ilişkili üst bilgi değeri olan bir liste olarak da dönüştürülür. Bölüm kimliği, denetim noktası oluşturucu ve son sıraya alınan özelliklerin ortak üst bilgileri için, olay grubunun tamamı için tek bir değer olarak sunulurlar ve aynı değeri paylaşırlar. Daha fazla bilgi için Event Hubs İleti Üst Bilgileri bölümüne bakın.

Dekont

Denetim noktası üst bilgisi yalnızca EL ile denetim noktası modu kullanıldığında bulunur.

Toplu iş tüketicisinin denetim noktası oluşturma işlemi iki modu destekler: BATCH ve MANUAL. BATCH modu, olay toplu işleminin tamamını alındıktan sonra birlikte kontrol etmek için otomatik bir denetim noktası modudur. MANUAL modu, kullanıcılara göre olayları kontrol etmektir. Kullanıldığında, Denetim Noktası Oluşturucu ileti üst bilgisine geçirilir ve kullanıcılar denetim noktası oluşturma işlemi yapmak için bunu kullanabilir.

Toplu işlem kullanan ilke ve max-wait-timeözellikleri max-size tarafından belirtilebilir; burada max-size gerekli bir özelliktir ve max-wait-time isteğe bağlıdır. Geliştiriciler, toplu kullanım stratejisini belirtmek için yapılandırma için kullanabilir EventHubsContainerProperties . yapılandırma EventHubsContainerPropertiesörneği için aşağıdaki bölüme bakın.

Bağımlılık kurulumu

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Yapılandırma

Bu başlatıcı, yapılandırma seçeneklerinin aşağıdaki 3 bölümünü sağlar:

Bağlan ion Yapılandırma Özellikleri

Bu bölüm, Azure Event Hubs'a bağlanmak için kullanılan yapılandırma seçeneklerini içerir.

Dekont

Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz, güvenlik sorumlusuna Azure kaynağına erişmek için yeterli iznin verildiğinden emin olmak için bkz . Microsoft Entra Id ile erişimi yetkilendirme.

spring-cloud-azure-starter-integration-eventhubs Bağlan ion yapılandırılabilir özellikleri:

Özellik Türü Açıklama
spring.cloud.azure.eventhubs.enabled boolean Azure Event Hubs'ın etkinleştirilip etkinleştirilmediği.
spring.cloud.azure.eventhubs.connection-string String Event Hubs Ad Alanı bağlantı dizesi değeri.
spring.cloud.azure.eventhubs.namespace String FQDN'nin ön eki olan Event Hubs Ad Alanı değeri. FQDN, NamespaceName.DomainName dosyasından oluşmalıdır
spring.cloud.azure.eventhubs.domain-name String Azure Event Hubs Ad Alanı değerinin etki alanı adı.
spring.cloud.azure.eventhubs.custom-endpoint-address String Özel Uç Nokta adresi.
spring.cloud.azure.eventhubs.shared-connection Boolean Temel alınan EventProcessorClient ve EventHubProducerAsyncClient'ın aynı bağlantıyı kullanıp kullanmadığı. Varsayılan olarak, oluşturulan her Olay Hub'ı istemcisi için yeni bir bağlantı oluşturulur ve oluşturulur.

Denetim Noktası Yapılandırma Özellikleri

Bu bölüm, bölüm sahipliğini ve denetim noktası bilgilerini kalıcı hale etmek için kullanılan Depolama Blobları hizmetinin yapılandırma seçeneklerini içerir.

Dekont

Sürüm 4.0.0'dan itibaren spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists özelliği el ile etkinleştirilmediğinde Depolama kapsayıcısı otomatik olarak oluşturulmaz.

Spring-cloud-azure-starter-integration-eventhubs'ın yapılandırılabilir özelliklerini denetleme:

Özellik Türü Açıklama
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolean Mevcut değilse kapsayıcı oluşturmaya izin verilip verilmeyeceği.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name String Depolama hesabının adı.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key String hesap erişim anahtarını Depolama.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name String kapsayıcı adını Depolama.

Yaygın Azure Hizmeti SDK yapılandırma seçenekleri Depolama Blob denetim noktası deposu için de yapılandırılabilir. Desteklenen yapılandırma seçenekleri Spring Cloud Azure yapılandırmasında kullanıma sunulmuştur ve birleştirilmiş ön ek veya ön ekiyle spring.cloud.azure.spring.cloud.azure.eventhubs.processor.checkpoint-storeyapılandırılabilir.

Event Hub işlemci yapılandırma özellikleri

EventProcessorClient, EventHubsInboundChannelAdapter bir olay hub'ından gelen iletileri kullanmak, geliştiricilerin EventProcessorClientyapılandırma için kullanabileceği EventHubsContainerProperties genel özelliklerini yapılandırmak için kullanır. ile EventHubsInboundChannelAdapterçalışma hakkında aşağıdaki bölüme bakın.

Temel kullanım

Azure Event Hubs'a ileti gönderme

  1. Kimlik bilgisi yapılandırma seçeneklerini doldurun.

    • bağlantı dizesi kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Dekont

için tenant-id izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için Hata AADSTS50020 - Kimlik sağlayıcısı kullanıcı hesabı kiracıda yok hatasının Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz . Microsoft Entra Id'de tek kiracılı uygulamayı çok kiracılıya dönüştürme.

  1. Event Hubs'a EventHubsTemplate ileti göndermek için bean ile oluşturunDefaultMessageHandler.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Bir ileti kanalı aracılığıyla yukarıdaki ileti işleyicisiyle bir ileti ağ geçidi bağlaması oluşturun.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Ağ geçidini kullanarak ileti gönderme.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Azure Event Hubs'dan ileti alma

  1. Kimlik bilgisi yapılandırma seçeneklerini doldurun.

  2. Giriş kanalı olarak bir ileti kanalı çekirdeği oluşturun.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Event Hubs'dan EventHubsMessageListenerContainer ileti almak için bean ile oluşturunEventHubsInboundChannelAdapter.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Daha önce oluşturulan ileti kanalı aracılığıyla EventHubsInboundChannelAdapter ile bir ileti alıcısı bağlaması oluşturun.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

ObjectMapper'ı özelleştirmek için EventHubsMessageConverter'ı yapılandırma

EventHubsMessageConverter , kullanıcıların ObjectMapper'ı özelleştirmesine izin vermek için yapılandırılabilir bir fasulye olarak yapılır.

Batch tüketici desteği

Event Hubs'dan gelen iletileri toplu olarak kullanmak yukarıdaki örneğe benzer; ayrıca, kullanıcılar için EventHubsInboundChannelAdaptertoplu işlem kullanan ilgili yapılandırma seçeneklerini ayarlamalıdır.

oluşturulurken EventHubsInboundChannelAdapterdinleyici modu olarak BATCHayarlanmalıdır. bean EventHubsMessageListenerContaineroluştururken, denetim noktası modunu veya BATCHolarak MANUAL ayarlayın; toplu iş seçenekleri gerektiği gibi yapılandırılabilir.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Event Hubs ileti üst bilgileri

Aşağıdaki tabloda Event Hubs ileti özelliklerinin Spring ileti üst bilgileriyle nasıl eşlendiği gösterilmektedir. Azure Event Hubs için ileti olarak eventadlandırılır.

Kayıt Dinleyicisi Modunda Event Hubs İletisi / Olay Özellikleri ile Spring message üst bilgileri arasında eşleme:

Event Hubs Olay Özellikleri Spring Message Header Sabitleri Türü Açıklama
Sıralanan süre EventHubsHeaders#ENQUEUED_TIME Anlık Olayın Olay Hub'ı bölümünde ne zaman sıralandığına ilişkin utc olarak anlık değer.
Atlanacak sayı EventHubsHeaders#OFFSET Uzun İlişkili Olay Hub'ı bölümünden alınan olayın uzaklığı.
Bölüm anahtarı AzureHeaders#PARTITION_KEY String Olay ilk yayımlandığında ayarlandıysa bölüm karma anahtarı.
Bölüm Kimliği AzureHeaders#RAW_PARTITION_ID String Olay Hub'ının bölüm kimliği.
Numara serisi EventHubsHeaders#SEQUENCE_NUMBER Uzun İlişkili Olay Hub'ı bölümünde sıralandığında olaya atanan sıra numarası.
Son sıralanan olay özellikleri EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Bu bölümdeki son sıraya alınan olayın özellikleri.
NA AzureHeaders#CHECKPOINTER Denetim Noktası Oluşturucu Belirli bir iletinin denetim noktası üst bilgisi.

Kullanıcılar, her olayın ilgili bilgileri için ileti üst bilgilerini ayrıştırabilir. Olay için bir ileti üst bilgisi ayarlamak için, tüm özelleştirilmiş üst bilgiler bir olayın uygulama özelliği olarak konur ve burada üst bilgi özellik anahtarı olarak ayarlanır. Event Hubs'dan olaylar alındığında, tüm uygulama özellikleri ileti üst bilgisine dönüştürülür.

Dekont

Bölüm anahtarı, sıralanan süre, uzaklık ve sıra numarasının ileti üst bilgilerinin el ile ayarlanması desteklenmez.

Toplu iş tüketici modu etkinleştirildiğinde, toplu iletilerin belirli üst bilgileri, her bir Event Hubs olayından değerlerin listesini içeren aşağıdaki şekilde listelenir.

Toplu dinleyici modunda Event Hubs İletisi / Olay Özellikleri ile Spring message üst bilgileri arasında eşleme:

Event Hubs Olay Özellikleri Spring Batch İleti Üst Bilgi Sabitleri Türü Açıklama
Sıralanan süre EventHubsHeaders#ENQUEUED_TIME Anlık Liste Olay Hub'ı bölümünde her olayın ne zaman sıralandığının UTC olarak anlık listesi.
Atlanacak sayı EventHubsHeaders#OFFSET Uzun Listesi İlişkili Olay Hub'ı bölümünden alınan her olayın uzaklığının listesi.
Bölüm anahtarı AzureHeaders#PARTITION_KEY Dize Listesi Her olayı özgün olarak yayımlarken ayarlandıysa bölüm karma anahtarının listesi.
Numara serisi EventHubsHeaders#SEQUENCE_NUMBER Uzun Listesi İlişkili Olay Hub'ı bölümünde sıralandığında her olaya atanan sıra numarasının listesi.
Sistem özellikleri EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Harita Listesi Her olayın sistem özelliklerinin listesi.
Uygulama özellikleri EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Harita Listesi Tüm özelleştirilmiş ileti üst bilgilerinin veya olay özelliklerinin yerleştirildiği her olayın uygulama özelliklerinin listesi.

Dekont

İletiler yayımlandığında, varsa yukarıdaki toplu iş üst bilgilerinin tümü iletilerden kaldırılır.

Örnekler

Daha fazla bilgi için GitHub'daki azure-spring-boot-samples deposuna bakın.

Azure Service Bus ile Spring Tümleştirmesi

Temel kavramlar

Spring Integration, Spring tabanlı uygulamalarda basit mesajlaşmaya olanak tanır ve bildirim temelli bağdaştırıcılar aracılığıyla dış sistemlerle tümleştirmeyi destekler.

Azure Service Bus için Spring Integration uzantısı projesi, Azure Service Bus için gelen ve giden kanal bağdaştırıcıları sağlar.

Dekont

CompletableFuture destek API'leri 2.10.0 sürümünden kullanım dışı bırakıldı ve 4.0.0 sürümünden Reactor Core ile değiştirildi. Ayrıntılar için bkz. Javadoc.

Bağımlılık kurulumu

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Yapılandırma

Bu başlatıcı, yapılandırma seçeneklerinin aşağıdaki 2 bölümünü sağlar:

Bağlan yapılandırma özellikleri

Bu bölüm, Azure Service Bus'a bağlanmak için kullanılan yapılandırma seçeneklerini içerir.

Dekont

Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz, güvenlik sorumlusuna Azure kaynağına erişmek için yeterli iznin verildiğinden emin olmak için bkz . Microsoft Entra Id ile erişimi yetkilendirme.

spring-cloud-azure-starter-integration-servicebus Bağlan ion yapılandırılabilir özellikleri:

Özellik Türü Açıklama
spring.cloud.azure.servicebus.enabled boolean Azure Service Bus'ın etkinleştirilip etkinleştirilmediği.
spring.cloud.azure.servicebus.connection-string String Service Bus Ad Alanı bağlantı dizesi değeri.
spring.cloud.azure.servicebus.namespace String FQDN ön eki olan Service Bus Ad Alanı değeri. FQDN, NamespaceName.DomainName dosyasından oluşmalıdır
spring.cloud.azure.servicebus.domain-name String Azure Service Bus Ad Alanı değerinin etki alanı adı.

Service Bus işlemci yapılandırma özellikleri

, geliştiricilerin ServiceBusInboundChannelAdapterServiceBusProcessorClient yapılandırma için kullanabileceği ServiceBusContainerProperties genel özelliklerini ServiceBusProcessorClientyapılandırmak üzere iletileri kullanmak için kullanır. ile ServiceBusInboundChannelAdapterçalışma hakkında aşağıdaki bölüme bakın.

Temel kullanım

Azure Service Bus'a ileti gönderme

  1. Kimlik bilgisi yapılandırma seçeneklerini doldurun.

    • bağlantı dizesi kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Dekont

için tenant-id izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için Hata AADSTS50020 - Kimlik sağlayıcısı kullanıcı hesabı kiracıda yok hatasının Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz . Microsoft Entra Id'de tek kiracılı uygulamayı çok kiracılıya dönüştürme.

  • Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Dekont

için tenant-id izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için Hata AADSTS50020 - Kimlik sağlayıcısı kullanıcı hesabı kiracıda yok hatasının Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz . Microsoft Entra Id'de tek kiracılı uygulamayı çok kiracılıya dönüştürme.

  1. Service Bus'a ServiceBusTemplate ileti göndermek için bean ile oluşturunDefaultMessageHandler, ServiceBusTemplate için varlık türünü ayarlayın. Bu örnekte örnek olarak Service Bus Kuyruğu alınıyor.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Bir ileti kanalı aracılığıyla yukarıdaki ileti işleyicisiyle bir ileti ağ geçidi bağlaması oluşturun.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Ağ geçidini kullanarak ileti gönderme.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Azure Service Bus'tan ileti alma

  1. Kimlik bilgisi yapılandırma seçeneklerini doldurun.

  2. Giriş kanalı olarak bir ileti kanalı çekirdeği oluşturun.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Service Bus'a ServiceBusMessageListenerContainer ileti almak için fasulye ile oluşturunServiceBusInboundChannelAdapter. Bu örnekte örnek olarak Service Bus Kuyruğu alınıyor.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. ile daha önce oluşturduğumuz ileti kanalı aracılığıyla bir ileti alıcı bağlaması ServiceBusInboundChannelAdapter oluşturun.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

ObjectMapper'ı özelleştirmek için ServiceBusMessageConverter'ı yapılandırma

ServiceBusMessageConverter , kullanıcıların ObjectMapperözelleştirmesine izin vermek için yapılandırılabilir bir fasulye olarak yapılır.

Service Bus ileti üst bilgileri

Birden çok Spring üst bilgisi sabitine eşlenebilen bazı Service Bus üst bilgileri için, farklı Spring üst bilgilerinin önceliği listelenir.

Service Bus Üst Bilgileri ile Spring Üst Bilgileri arasında eşleme:

Service Bus ileti üst bilgileri ve özellikleri Spring message header sabitleri Tür Konfigüre edilebilir Açıklama
Content type MessageHeaders#CONTENT_TYPE String Evet İletinin RFC2045 İçerik Türü tanımlayıcısı.
Bağıntı Kimliği ServiceBusMessageHeaders#CORRELATION_ID String Evet İletinin bağıntı kimliği
İleti Kimliği ServiceBusMessageHeaders#MESSAGE_ID String Evet İletinin ileti kimliği, bu üst bilgiden daha MessageHeaders#IDyüksek önceliğe sahiptir.
İleti Kimliği MessageHeaders#ID UUID Evet İletinin ileti kimliği, bu üst bilgiden daha ServiceBusMessageHeaders#MESSAGE_IDdüşük önceliğe sahiptir.
Bölüm anahtarı ServiceBusMessageHeaders#PARTITION_KEY String Evet İletiyi bölümlenmiş bir varlığa göndermek için bölüm anahtarı.
Yanıtla MessageHeaders#REPLY_CHANNEL String Evet Yanıtların gönderleneceği varlığın adresi.
Oturum kimliğini yanıtlama ServiceBusMessageHeaders#REPLY_TO_SESSION_ID String Evet İletinin ReplyToGroupId özellik değeri.
Zamanlanan sıra saati utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Evet Service Bus'ta iletinin sıralanması gereken tarih saat, bu üst bilgiden daha AzureHeaders#SCHEDULED_ENQUEUE_MESSAGEyüksek önceliğe sahiptir.
Zamanlanan sıra saati utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Tamsayı Evet İletinin Service Bus'ta sıralanması gereken tarih saat, bu üst bilgiden daha ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIMEdüşük önceliğe sahiptir.
Oturum kimliği ServiceBusMessageHeaders#SESSION_ID String Evet Oturum kullanan bir varlık için oturum IDentifier.
Yaşam süresi ServiceBusMessageHeaders#TIME_TO_LIVE Duration Evet Bu iletinin süresi dolmadan önceki süre.
İşlem ServiceBusMessageHeaders#TO String Evet yönlendirme senaryolarında gelecekte kullanılmak üzere ayrılmış ve aracı tarafından şu anda yoksayılan iletinin "to" adresi.
Subject ServiceBusMessageHeaders#SUBJECT String Evet İletinin konusu.
Teslim edilemeyen harf hatası açıklaması ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION String Hayı Teslim edilemeyen iletinin açıklaması.
Teslim edilemeyen mektup nedeni ServiceBusMessageHeaders#DEAD_LETTER_REASON String Hayı Bir iletinin yazılmama nedeni.
Teslim edilemeyen harf kaynağı ServiceBusMessageHeaders#DEAD_LETTER_SOURCE String Hayı İletinin teslim edilmediği varlık.
Teslim sayısı ServiceBusMessageHeaders#DELIVERY_COUNT uzun No Bu iletinin istemcilere teslim etme sayısı.
Sıralanmış sıra numarası ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER uzun No Service Bus tarafından bir iletiye atanan sıralı sıra numarası.
Sıralanan süre ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime No Bu iletinin Service Bus'ta sıralandığı tarih saat.
Son kullanma tarihi: ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime No Bu iletinin süresinin dolacağı tarih saat.
Kilit belirteci ServiceBusMessageHeaders#LOCK_TOKEN String Hayı Geçerli ileti için kilit belirteci.
Kilitlenene kadar ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime No Bu iletinin kilidinin süresinin dolmasına neden olan tarih saat.
Numara serisi ServiceBusMessageHeaders#SEQUENCE_NUMBER uzun No Service Bus tarafından bir iletiye atanan benzersiz numara.
State ServiceBusMessageHeaders#STATE ServiceBusMessageState No Etkin, Ertelenmiş veya Zamanlanmış olabilecek iletinin durumu.

Bölüm anahtarı desteği

Bu başlatıcı, ileti üst bilgisinde bölüm anahtarı ve oturum kimliği ayarına izin vererek Service Bus bölümlemesini destekler. Bu bölümde, iletiler için bölüm anahtarının nasıl ayarlanacağı anlatlenmektedir.

Önerilen: Üst bilginin anahtarı olarak kullanın ServiceBusMessageHeaders.PARTITION_KEY .

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Önerilmez, ancak şu anda desteklenir:AzureHeaders.PARTITION_KEY üst bilginin anahtarı olarak.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Dekont

hem hem AzureHeaders.PARTITION_KEY de ServiceBusMessageHeaders.PARTITION_KEY ileti üst bilgilerinde ayarlandığında tercih ServiceBusMessageHeaders.PARTITION_KEY edilir.

Oturum desteği

Bu örnekte, uygulamadaki bir iletinin oturum kimliğinin el ile nasıl ayarlanacağı gösterilmektedir.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Dekont

ServiceBusMessageHeaders.SESSION_ID, ileti üst bilgilerinde ayarlandığında ve farklı ServiceBusMessageHeaders.PARTITION_KEY bir üst bilgi de ayarlandığında, bölüm anahtarının değerinin üzerine yazmak için oturum kimliğinin değeri sonunda kullanılır.

Örnekler

Daha fazla bilgi için GitHub'daki azure-spring-boot-samples deposuna bakın.

Azure Depolama Kuyruğu ile Spring Tümleştirmesi

Temel kavramlar

Azure Kuyruk Depolama, çok sayıda iletiyi depolamaya yönelik bir hizmettir. HTTP veya HTTPS kullanarak kimliği doğrulanmış çağrılar aracılığıyla dünyanın herhangi bir yerinden iletilere erişebilirsiniz. Kuyruk iletisinin boyutu en fazla 64 KB olabilir. Kuyruk, depolama hesabının toplam kapasite sınırına kadar milyonlarca ileti içerebilir. Kuyruklar genellikle zaman uyumsuz olarak işlenmek üzere bir iş kapsamı oluşturmak için kullanılır.

Bağımlılık kurulumu

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Yapılandırma

Bu başlatıcı aşağıdaki yapılandırma seçeneklerini sağlar:

Bağlan yapılandırma özellikleri

Bu bölüm, Azure Depolama Kuyruğuna bağlanmak için kullanılan yapılandırma seçeneklerini içerir.

Dekont

Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz, güvenlik sorumlusuna Azure kaynağına erişmek için yeterli iznin verildiğinden emin olmak için bkz . Microsoft Entra Id ile erişimi yetkilendirme.

spring-cloud-azure-starter-integration-storage-queue Bağlan ion yapılandırılabilir özellikleri:

Özellik Türü Açıklama
spring.cloud.azure.storage.queue.enabled boolean Azure Depolama Kuyruğu'un etkinleştirilip etkinleştirilmediği.
spring.cloud.azure.storage.queue.connection-string String Kuyruk Ad Alanı bağlantı dizesi değerini Depolama.
spring.cloud.azure.storage.queue.accountName String Kuyruk hesabı adını Depolama.
spring.cloud.azure.storage.queue.accountKey String Kuyruk hesabı anahtarını Depolama.
spring.cloud.azure.storage.queue.endpoint String Kuyruk hizmeti uç noktasını Depolama.
spring.cloud.azure.storage.queue.sasToken String Sas belirteci kimlik bilgileri
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion API istekleri yapılırken kullanılan QueueServiceVersion.
spring.cloud.azure.storage.queue.messageEncoding String Kuyruk iletisi kodlama.

Temel kullanım

Azure Depolama Kuyruğuna ileti gönderme

  1. Kimlik bilgisi yapılandırma seçeneklerini doldurun.

    • bağlantı dizesi kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Dekont

için tenant-id izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için Hata AADSTS50020 - Kimlik sağlayıcısı kullanıcı hesabı kiracıda yok hatasının Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz . Microsoft Entra Id'de tek kiracılı uygulamayı çok kiracılıya dönüştürme.

  • Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Dekont

için tenant-id izin verilen değerler şunlardır: common, organizations, consumersveya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için Hata AADSTS50020 - Kimlik sağlayıcısı kullanıcı hesabı kiracıda yok hatasının Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz . Microsoft Entra Id'de tek kiracılı uygulamayı çok kiracılıya dönüştürme.

  1. Depolama Kuyruğuna StorageQueueTemplate ileti göndermek için fasulye ile oluşturunDefaultMessageHandler.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Bir ileti kanalı aracılığıyla yukarıdaki ileti işleyicisiyle bir İleti ağ geçidi bağlaması oluşturun.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Ağ geçidini kullanarak ileti gönderme.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Azure Depolama Kuyruğundan ileti alma

  1. Kimlik bilgisi yapılandırma seçeneklerini doldurun.

  2. Giriş kanalı olarak bir ileti kanalı çekirdeği oluşturun.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Depolama Kuyruğuna StorageQueueTemplate ileti almak için bean ile oluşturunStorageQueueMessageSource.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Daha önce oluşturduğumuz ileti kanalı aracılığıyla son adımda oluşturulan Depolama QueueMessageSource ile bir ileti alıcı bağlaması oluşturun.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Örnekler

Daha fazla bilgi için GitHub'daki azure-spring-boot-samples deposuna bakın.