Spring Integration için Spring Cloud Azure desteği
Bu makale şunlar için geçerlidir: ✔️ Sürüm 4.14.0 ✔️ Sürüm 5.8.0
Azure için Spring Integration Uzantısı, Java için Azure SDK tarafından sağlanan çeşitli hizmetler için Spring Integration bağdaştırıcıları sağlar. Şu Azure hizmetleri için Spring Integration desteği sunuyoruz: Event Hubs, Service Bus, Depolama Kuyruğu. Desteklenen bağdaştırıcıların listesi aşağıdadır:
spring-cloud-azure-starter-integration-eventhubs
- daha fazla bilgi için bkz. Azure Event Hubs ile Spring Tümleştirmesispring-cloud-azure-starter-integration-servicebus
- daha fazla bilgi için bkz. Azure Service Bus ile Spring Tümleştirmesispring-cloud-azure-starter-integration-storage-queue
- daha fazla bilgi için bkz. Azure Depolama Kuyruğu ile Spring Tümleştirmesi
Azure Event Hubs ile Spring Tümleştirmesi
Temel kavramlar
Azure Event Hubs, büyük bir veri akışı platformu ve olay alımı hizmetidir. Saniyede milyonlarca olayı alabilir ve işleyebilir. Bir olay hub’ına gönderilen veriler, herhangi bir gerçek zamanlı analiz sağlayıcısı ve işlem grubu oluşturma/depolama bağdaştırıcıları kullanılarak dönüştürülüp depolanabilir.
Spring Integration, Spring tabanlı uygulamalarda basit mesajlaşmaya olanak tanır ve bildirim temelli bağdaştırıcılar aracılığıyla dış sistemlerle tümleştirmeyi destekler. Bu bağdaştırıcılar Spring'in uzaktan iletişim, mesajlaşma ve zamanlama desteği konusunda daha yüksek düzeyde soyutlama sağlar. Event Hubs için Spring Tümleştirmesi uzantısı projesi, Azure Event Hubs için gelen ve giden kanal bağdaştırıcıları ve ağ geçitleri sağlar.
Dekont
RxJava destek API'leri 4.0.0 sürümünden bırakılır. Ayrıntılar için bkz. Javadoc.
Tüketici grubu
Event Hubs, tüketici grubu için Apache Kafka ile benzer destek sağlar ancak biraz farklı mantık sunar. Kafka, işlenen tüm uzaklıkları aracıda depolasa da, el ile işlenen Event Hubs iletilerinin uzaklıklarını depolamanız gerekir. Event Hubs SDK'sı, bu tür uzaklıkları Azure Depolama içinde depolama işlevi sağlar.
Bölümleme desteği
Event Hubs, Kafka ile benzer bir fiziksel bölüm kavramı sağlar. Ancak Kafka'nın tüketiciler ve bölümler arasında otomatik yeniden dengelemeden farklı olarak Event Hubs bir tür önleyici mod sağlar. Depolama hesabı, hangi bölümün hangi tüketiciye ait olduğunu belirlemek için kira görevi görür. Yeni bir tüketici başladığında, iş yükü dengelemeyi başarmak için çoğu ağır yüklü tüketiciden bazı bölümleri çalmaya çalışır.
Geliştiriciler, yük dengeleme stratejisini belirtmek için yapılandırma için kullanabilir EventHubsContainerProperties
. yapılandırma EventHubsContainerProperties
örneği için aşağıdaki bölüme bakın.
Batch tüketici desteği
, EventHubsInboundChannelAdapter
toplu işlem kullanma modunu destekler. Bunu etkinleştirmek için kullanıcılar bir örneği oluştururken EventHubsInboundChannelAdapter
olduğu gibi ListenerMode.BATCH
dinleyici modunu belirtebilir.
Etkinleştirildiğinde, yükü toplu olayların listesi olan bir İleti alınır ve aşağı akış kanalına geçirilir. Her ileti üst bilgisi, içeriği her olaydan ayrıştırılan ilişkili üst bilgi değeri olan bir liste olarak da dönüştürülür. Bölüm kimliği, denetim noktası oluşturucu ve son sıraya alınan özelliklerin ortak üst bilgileri için, olay grubunun tamamı için tek bir değer olarak sunulurlar ve aynı değeri paylaşırlar. Daha fazla bilgi için Event Hubs İleti Üst Bilgileri bölümüne bakın.
Dekont
Denetim noktası üst bilgisi yalnızca EL ile denetim noktası modu kullanıldığında bulunur.
Toplu iş tüketicisinin denetim noktası oluşturma işlemi iki modu destekler: BATCH
ve MANUAL
. BATCH
modu, olay toplu işleminin tamamını alındıktan sonra birlikte kontrol etmek için otomatik bir denetim noktası modudur. MANUAL
modu, kullanıcılara göre olayları kontrol etmektir. Kullanıldığında, Denetim Noktası Oluşturucu ileti üst bilgisine geçirilir ve kullanıcılar denetim noktası oluşturma işlemi yapmak için bunu kullanabilir.
Toplu işlem kullanan ilke ve max-wait-time
özellikleri max-size
tarafından belirtilebilir; burada max-size
gerekli bir özelliktir ve max-wait-time
isteğe bağlıdır.
Geliştiriciler, toplu kullanım stratejisini belirtmek için yapılandırma için kullanabilir EventHubsContainerProperties
. yapılandırma EventHubsContainerProperties
örneği için aşağıdaki bölüme bakın.
Bağımlılık kurulumu
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Yapılandırma
Bu başlatıcı, yapılandırma seçeneklerinin aşağıdaki 3 bölümünü sağlar:
Bağlan ion Yapılandırma Özellikleri
Bu bölüm, Azure Event Hubs'a bağlanmak için kullanılan yapılandırma seçeneklerini içerir.
Dekont
Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz, güvenlik sorumlusuna Azure kaynağına erişmek için yeterli iznin verildiğinden emin olmak için bkz . Microsoft Entra Id ile erişimi yetkilendirme.
spring-cloud-azure-starter-integration-eventhubs Bağlan ion yapılandırılabilir özellikleri:
Özellik | Türü | Açıklama |
---|---|---|
spring.cloud.azure.eventhubs.enabled | boolean | Azure Event Hubs'ın etkinleştirilip etkinleştirilmediği. |
spring.cloud.azure.eventhubs.connection-string | String | Event Hubs Ad Alanı bağlantı dizesi değeri. |
spring.cloud.azure.eventhubs.namespace | String | FQDN'nin ön eki olan Event Hubs Ad Alanı değeri. FQDN, NamespaceName.DomainName dosyasından oluşmalıdır |
spring.cloud.azure.eventhubs.domain-name | String | Azure Event Hubs Ad Alanı değerinin etki alanı adı. |
spring.cloud.azure.eventhubs.custom-endpoint-address | String | Özel Uç Nokta adresi. |
spring.cloud.azure.eventhubs.shared-connection | Boolean | Temel alınan EventProcessorClient ve EventHubProducerAsyncClient'ın aynı bağlantıyı kullanıp kullanmadığı. Varsayılan olarak, oluşturulan her Olay Hub'ı istemcisi için yeni bir bağlantı oluşturulur ve oluşturulur. |
Denetim Noktası Yapılandırma Özellikleri
Bu bölüm, bölüm sahipliğini ve denetim noktası bilgilerini kalıcı hale etmek için kullanılan Depolama Blobları hizmetinin yapılandırma seçeneklerini içerir.
Dekont
Sürüm 4.0.0'dan itibaren spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists özelliği el ile etkinleştirilmediğinde Depolama kapsayıcısı otomatik olarak oluşturulmaz.
Spring-cloud-azure-starter-integration-eventhubs'ın yapılandırılabilir özelliklerini denetleme:
Özellik | Türü | Açıklama |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolean | Mevcut değilse kapsayıcı oluşturmaya izin verilip verilmeyeceği. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | String | Depolama hesabının adı. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | String | hesap erişim anahtarını Depolama. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | String | kapsayıcı adını Depolama. |
Yaygın Azure Hizmeti SDK yapılandırma seçenekleri Depolama Blob denetim noktası deposu için de yapılandırılabilir. Desteklenen yapılandırma seçenekleri Spring Cloud Azure yapılandırmasında kullanıma sunulmuştur ve birleştirilmiş ön ek veya ön ekiyle spring.cloud.azure.
spring.cloud.azure.eventhubs.processor.checkpoint-store
yapılandırılabilir.
Event Hub işlemci yapılandırma özellikleri
EventProcessorClient
, EventHubsInboundChannelAdapter
bir olay hub'ından gelen iletileri kullanmak, geliştiricilerin EventProcessorClient
yapılandırma için kullanabileceği EventHubsContainerProperties
genel özelliklerini yapılandırmak için kullanır. ile EventHubsInboundChannelAdapter
çalışma hakkında aşağıdaki bölüme bakın.
Temel kullanım
Azure Event Hubs'a ileti gönderme
Kimlik bilgisi yapılandırma seçeneklerini doldurun.
bağlantı dizesi kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Dekont
için tenant-id
izin verilen değerler şunlardır: common
, organizations
, consumers
veya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için Hata AADSTS50020 - Kimlik sağlayıcısı kullanıcı hesabı kiracıda yok hatasının Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz . Microsoft Entra Id'de tek kiracılı uygulamayı çok kiracılıya dönüştürme.
Event Hubs'a
EventHubsTemplate
ileti göndermek için bean ile oluşturunDefaultMessageHandler
.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Bir ileti kanalı aracılığıyla yukarıdaki ileti işleyicisiyle bir ileti ağ geçidi bağlaması oluşturun.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Ağ geçidini kullanarak ileti gönderme.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Azure Event Hubs'dan ileti alma
Kimlik bilgisi yapılandırma seçeneklerini doldurun.
Giriş kanalı olarak bir ileti kanalı çekirdeği oluşturun.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Event Hubs'dan
EventHubsMessageListenerContainer
ileti almak için bean ile oluşturunEventHubsInboundChannelAdapter
.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Daha önce oluşturulan ileti kanalı aracılığıyla EventHubsInboundChannelAdapter ile bir ileti alıcısı bağlaması oluşturun.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
ObjectMapper'ı özelleştirmek için EventHubsMessageConverter'ı yapılandırma
EventHubsMessageConverter
, kullanıcıların ObjectMapper'ı özelleştirmesine izin vermek için yapılandırılabilir bir fasulye olarak yapılır.
Batch tüketici desteği
Event Hubs'dan gelen iletileri toplu olarak kullanmak yukarıdaki örneğe benzer; ayrıca, kullanıcılar için EventHubsInboundChannelAdapter
toplu işlem kullanan ilgili yapılandırma seçeneklerini ayarlamalıdır.
oluşturulurken EventHubsInboundChannelAdapter
dinleyici modu olarak BATCH
ayarlanmalıdır. bean EventHubsMessageListenerContainer
oluştururken, denetim noktası modunu veya BATCH
olarak MANUAL
ayarlayın; toplu iş seçenekleri gerektiği gibi yapılandırılabilir.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Event Hubs ileti üst bilgileri
Aşağıdaki tabloda Event Hubs ileti özelliklerinin Spring ileti üst bilgileriyle nasıl eşlendiği gösterilmektedir. Azure Event Hubs için ileti olarak event
adlandırılır.
Kayıt Dinleyicisi Modunda Event Hubs İletisi / Olay Özellikleri ile Spring message üst bilgileri arasında eşleme:
Event Hubs Olay Özellikleri | Spring Message Header Sabitleri | Türü | Açıklama |
---|---|---|---|
Sıralanan süre | EventHubsHeaders#ENQUEUED_TIME | Anlık | Olayın Olay Hub'ı bölümünde ne zaman sıralandığına ilişkin utc olarak anlık değer. |
Atlanacak sayı | EventHubsHeaders#OFFSET | Uzun | İlişkili Olay Hub'ı bölümünden alınan olayın uzaklığı. |
Bölüm anahtarı | AzureHeaders#PARTITION_KEY | String | Olay ilk yayımlandığında ayarlandıysa bölüm karma anahtarı. |
Bölüm Kimliği | AzureHeaders#RAW_PARTITION_ID | String | Olay Hub'ının bölüm kimliği. |
Numara serisi | EventHubsHeaders#SEQUENCE_NUMBER | Uzun | İlişkili Olay Hub'ı bölümünde sıralandığında olaya atanan sıra numarası. |
Son sıralanan olay özellikleri | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Bu bölümdeki son sıraya alınan olayın özellikleri. |
NA | AzureHeaders#CHECKPOINTER | Denetim Noktası Oluşturucu | Belirli bir iletinin denetim noktası üst bilgisi. |
Kullanıcılar, her olayın ilgili bilgileri için ileti üst bilgilerini ayrıştırabilir. Olay için bir ileti üst bilgisi ayarlamak için, tüm özelleştirilmiş üst bilgiler bir olayın uygulama özelliği olarak konur ve burada üst bilgi özellik anahtarı olarak ayarlanır. Event Hubs'dan olaylar alındığında, tüm uygulama özellikleri ileti üst bilgisine dönüştürülür.
Dekont
Bölüm anahtarı, sıralanan süre, uzaklık ve sıra numarasının ileti üst bilgilerinin el ile ayarlanması desteklenmez.
Toplu iş tüketici modu etkinleştirildiğinde, toplu iletilerin belirli üst bilgileri, her bir Event Hubs olayından değerlerin listesini içeren aşağıdaki şekilde listelenir.
Toplu dinleyici modunda Event Hubs İletisi / Olay Özellikleri ile Spring message üst bilgileri arasında eşleme:
Event Hubs Olay Özellikleri | Spring Batch İleti Üst Bilgi Sabitleri | Türü | Açıklama |
---|---|---|---|
Sıralanan süre | EventHubsHeaders#ENQUEUED_TIME | Anlık Liste | Olay Hub'ı bölümünde her olayın ne zaman sıralandığının UTC olarak anlık listesi. |
Atlanacak sayı | EventHubsHeaders#OFFSET | Uzun Listesi | İlişkili Olay Hub'ı bölümünden alınan her olayın uzaklığının listesi. |
Bölüm anahtarı | AzureHeaders#PARTITION_KEY | Dize Listesi | Her olayı özgün olarak yayımlarken ayarlandıysa bölüm karma anahtarının listesi. |
Numara serisi | EventHubsHeaders#SEQUENCE_NUMBER | Uzun Listesi | İlişkili Olay Hub'ı bölümünde sıralandığında her olaya atanan sıra numarasının listesi. |
Sistem özellikleri | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Harita Listesi | Her olayın sistem özelliklerinin listesi. |
Uygulama özellikleri | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Harita Listesi | Tüm özelleştirilmiş ileti üst bilgilerinin veya olay özelliklerinin yerleştirildiği her olayın uygulama özelliklerinin listesi. |
Dekont
İletiler yayımlandığında, varsa yukarıdaki toplu iş üst bilgilerinin tümü iletilerden kaldırılır.
Örnekler
Daha fazla bilgi için GitHub'daki azure-spring-boot-samples deposuna bakın.
Azure Service Bus ile Spring Tümleştirmesi
Temel kavramlar
Spring Integration, Spring tabanlı uygulamalarda basit mesajlaşmaya olanak tanır ve bildirim temelli bağdaştırıcılar aracılığıyla dış sistemlerle tümleştirmeyi destekler.
Azure Service Bus için Spring Integration uzantısı projesi, Azure Service Bus için gelen ve giden kanal bağdaştırıcıları sağlar.
Dekont
CompletableFuture destek API'leri 2.10.0 sürümünden kullanım dışı bırakıldı ve 4.0.0 sürümünden Reactor Core ile değiştirildi. Ayrıntılar için bkz. Javadoc.
Bağımlılık kurulumu
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Yapılandırma
Bu başlatıcı, yapılandırma seçeneklerinin aşağıdaki 2 bölümünü sağlar:
Bağlan yapılandırma özellikleri
Bu bölüm, Azure Service Bus'a bağlanmak için kullanılan yapılandırma seçeneklerini içerir.
Dekont
Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz, güvenlik sorumlusuna Azure kaynağına erişmek için yeterli iznin verildiğinden emin olmak için bkz . Microsoft Entra Id ile erişimi yetkilendirme.
spring-cloud-azure-starter-integration-servicebus Bağlan ion yapılandırılabilir özellikleri:
Özellik | Türü | Açıklama |
---|---|---|
spring.cloud.azure.servicebus.enabled | boolean | Azure Service Bus'ın etkinleştirilip etkinleştirilmediği. |
spring.cloud.azure.servicebus.connection-string | String | Service Bus Ad Alanı bağlantı dizesi değeri. |
spring.cloud.azure.servicebus.namespace | String | FQDN ön eki olan Service Bus Ad Alanı değeri. FQDN, NamespaceName.DomainName dosyasından oluşmalıdır |
spring.cloud.azure.servicebus.domain-name | String | Azure Service Bus Ad Alanı değerinin etki alanı adı. |
Service Bus işlemci yapılandırma özellikleri
, geliştiricilerin ServiceBusInboundChannelAdapter
ServiceBusProcessorClient
yapılandırma için kullanabileceği ServiceBusContainerProperties
genel özelliklerini ServiceBusProcessorClient
yapılandırmak üzere iletileri kullanmak için kullanır. ile ServiceBusInboundChannelAdapter
çalışma hakkında aşağıdaki bölüme bakın.
Temel kullanım
Azure Service Bus'a ileti gönderme
Kimlik bilgisi yapılandırma seçeneklerini doldurun.
bağlantı dizesi kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Dekont
için tenant-id
izin verilen değerler şunlardır: common
, organizations
, consumers
veya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için Hata AADSTS50020 - Kimlik sağlayıcısı kullanıcı hesabı kiracıda yok hatasının Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz . Microsoft Entra Id'de tek kiracılı uygulamayı çok kiracılıya dönüştürme.
Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Dekont
için tenant-id
izin verilen değerler şunlardır: common
, organizations
, consumers
veya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için Hata AADSTS50020 - Kimlik sağlayıcısı kullanıcı hesabı kiracıda yok hatasının Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz . Microsoft Entra Id'de tek kiracılı uygulamayı çok kiracılıya dönüştürme.
Service Bus'a
ServiceBusTemplate
ileti göndermek için bean ile oluşturunDefaultMessageHandler
, ServiceBusTemplate için varlık türünü ayarlayın. Bu örnekte örnek olarak Service Bus Kuyruğu alınıyor.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Bir ileti kanalı aracılığıyla yukarıdaki ileti işleyicisiyle bir ileti ağ geçidi bağlaması oluşturun.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Ağ geçidini kullanarak ileti gönderme.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Azure Service Bus'tan ileti alma
Kimlik bilgisi yapılandırma seçeneklerini doldurun.
Giriş kanalı olarak bir ileti kanalı çekirdeği oluşturun.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Service Bus'a
ServiceBusMessageListenerContainer
ileti almak için fasulye ile oluşturunServiceBusInboundChannelAdapter
. Bu örnekte örnek olarak Service Bus Kuyruğu alınıyor.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
ile daha önce oluşturduğumuz ileti kanalı aracılığıyla bir ileti alıcı bağlaması
ServiceBusInboundChannelAdapter
oluşturun.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
ObjectMapper'ı özelleştirmek için ServiceBusMessageConverter'ı yapılandırma
ServiceBusMessageConverter
, kullanıcıların ObjectMapper
özelleştirmesine izin vermek için yapılandırılabilir bir fasulye olarak yapılır.
Service Bus ileti üst bilgileri
Birden çok Spring üst bilgisi sabitine eşlenebilen bazı Service Bus üst bilgileri için, farklı Spring üst bilgilerinin önceliği listelenir.
Service Bus Üst Bilgileri ile Spring Üst Bilgileri arasında eşleme:
Service Bus ileti üst bilgileri ve özellikleri | Spring message header sabitleri | Tür | Konfigüre edilebilir | Açıklama |
---|---|---|---|---|
Content type | MessageHeaders#CONTENT_TYPE |
String | Evet | İletinin RFC2045 İçerik Türü tanımlayıcısı. |
Bağıntı Kimliği | ServiceBusMessageHeaders#CORRELATION_ID |
String | Evet | İletinin bağıntı kimliği |
İleti Kimliği | ServiceBusMessageHeaders#MESSAGE_ID |
String | Evet | İletinin ileti kimliği, bu üst bilgiden daha MessageHeaders#ID yüksek önceliğe sahiptir. |
İleti Kimliği | MessageHeaders#ID |
UUID | Evet | İletinin ileti kimliği, bu üst bilgiden daha ServiceBusMessageHeaders#MESSAGE_ID düşük önceliğe sahiptir. |
Bölüm anahtarı | ServiceBusMessageHeaders#PARTITION_KEY |
String | Evet | İletiyi bölümlenmiş bir varlığa göndermek için bölüm anahtarı. |
Yanıtla | MessageHeaders#REPLY_CHANNEL |
String | Evet | Yanıtların gönderleneceği varlığın adresi. |
Oturum kimliğini yanıtlama | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
String | Evet | İletinin ReplyToGroupId özellik değeri. |
Zamanlanan sıra saati utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Evet | Service Bus'ta iletinin sıralanması gereken tarih saat, bu üst bilgiden daha AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE yüksek önceliğe sahiptir. |
Zamanlanan sıra saati utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Tamsayı | Evet | İletinin Service Bus'ta sıralanması gereken tarih saat, bu üst bilgiden daha ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME düşük önceliğe sahiptir. |
Oturum kimliği | ServiceBusMessageHeaders#SESSION_ID |
String | Evet | Oturum kullanan bir varlık için oturum IDentifier. |
Yaşam süresi | ServiceBusMessageHeaders#TIME_TO_LIVE |
Duration | Evet | Bu iletinin süresi dolmadan önceki süre. |
İşlem | ServiceBusMessageHeaders#TO |
String | Evet | yönlendirme senaryolarında gelecekte kullanılmak üzere ayrılmış ve aracı tarafından şu anda yoksayılan iletinin "to" adresi. |
Subject | ServiceBusMessageHeaders#SUBJECT |
String | Evet | İletinin konusu. |
Teslim edilemeyen harf hatası açıklaması | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
String | Hayı | Teslim edilemeyen iletinin açıklaması. |
Teslim edilemeyen mektup nedeni | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
String | Hayı | Bir iletinin yazılmama nedeni. |
Teslim edilemeyen harf kaynağı | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
String | Hayı | İletinin teslim edilmediği varlık. |
Teslim sayısı | ServiceBusMessageHeaders#DELIVERY_COUNT |
uzun | No | Bu iletinin istemcilere teslim etme sayısı. |
Sıralanmış sıra numarası | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
uzun | No | Service Bus tarafından bir iletiye atanan sıralı sıra numarası. |
Sıralanan süre | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | No | Bu iletinin Service Bus'ta sıralandığı tarih saat. |
Son kullanma tarihi: | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | No | Bu iletinin süresinin dolacağı tarih saat. |
Kilit belirteci | ServiceBusMessageHeaders#LOCK_TOKEN |
String | Hayı | Geçerli ileti için kilit belirteci. |
Kilitlenene kadar | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | No | Bu iletinin kilidinin süresinin dolmasına neden olan tarih saat. |
Numara serisi | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
uzun | No | Service Bus tarafından bir iletiye atanan benzersiz numara. |
State | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | No | Etkin, Ertelenmiş veya Zamanlanmış olabilecek iletinin durumu. |
Bölüm anahtarı desteği
Bu başlatıcı, ileti üst bilgisinde bölüm anahtarı ve oturum kimliği ayarına izin vererek Service Bus bölümlemesini destekler. Bu bölümde, iletiler için bölüm anahtarının nasıl ayarlanacağı anlatlenmektedir.
Önerilen: Üst bilginin anahtarı olarak kullanın ServiceBusMessageHeaders.PARTITION_KEY
.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Önerilmez, ancak şu anda desteklenir:AzureHeaders.PARTITION_KEY
üst bilginin anahtarı olarak.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Dekont
hem hem AzureHeaders.PARTITION_KEY
de ServiceBusMessageHeaders.PARTITION_KEY
ileti üst bilgilerinde ayarlandığında tercih ServiceBusMessageHeaders.PARTITION_KEY
edilir.
Oturum desteği
Bu örnekte, uygulamadaki bir iletinin oturum kimliğinin el ile nasıl ayarlanacağı gösterilmektedir.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Dekont
ServiceBusMessageHeaders.SESSION_ID
, ileti üst bilgilerinde ayarlandığında ve farklı ServiceBusMessageHeaders.PARTITION_KEY
bir üst bilgi de ayarlandığında, bölüm anahtarının değerinin üzerine yazmak için oturum kimliğinin değeri sonunda kullanılır.
Örnekler
Daha fazla bilgi için GitHub'daki azure-spring-boot-samples deposuna bakın.
Azure Depolama Kuyruğu ile Spring Tümleştirmesi
Temel kavramlar
Azure Kuyruk Depolama, çok sayıda iletiyi depolamaya yönelik bir hizmettir. HTTP veya HTTPS kullanarak kimliği doğrulanmış çağrılar aracılığıyla dünyanın herhangi bir yerinden iletilere erişebilirsiniz. Kuyruk iletisinin boyutu en fazla 64 KB olabilir. Kuyruk, depolama hesabının toplam kapasite sınırına kadar milyonlarca ileti içerebilir. Kuyruklar genellikle zaman uyumsuz olarak işlenmek üzere bir iş kapsamı oluşturmak için kullanılır.
Bağımlılık kurulumu
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Yapılandırma
Bu başlatıcı aşağıdaki yapılandırma seçeneklerini sağlar:
Bağlan yapılandırma özellikleri
Bu bölüm, Azure Depolama Kuyruğuna bağlanmak için kullanılan yapılandırma seçeneklerini içerir.
Dekont
Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz, güvenlik sorumlusuna Azure kaynağına erişmek için yeterli iznin verildiğinden emin olmak için bkz . Microsoft Entra Id ile erişimi yetkilendirme.
spring-cloud-azure-starter-integration-storage-queue Bağlan ion yapılandırılabilir özellikleri:
Özellik | Türü | Açıklama |
---|---|---|
spring.cloud.azure.storage.queue.enabled | boolean | Azure Depolama Kuyruğu'un etkinleştirilip etkinleştirilmediği. |
spring.cloud.azure.storage.queue.connection-string | String | Kuyruk Ad Alanı bağlantı dizesi değerini Depolama. |
spring.cloud.azure.storage.queue.accountName | String | Kuyruk hesabı adını Depolama. |
spring.cloud.azure.storage.queue.accountKey | String | Kuyruk hesabı anahtarını Depolama. |
spring.cloud.azure.storage.queue.endpoint | String | Kuyruk hizmeti uç noktasını Depolama. |
spring.cloud.azure.storage.queue.sasToken | String | Sas belirteci kimlik bilgileri |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | API istekleri yapılırken kullanılan QueueServiceVersion. |
spring.cloud.azure.storage.queue.messageEncoding | String | Kuyruk iletisi kodlama. |
Temel kullanım
Azure Depolama Kuyruğuna ileti gönderme
Kimlik bilgisi yapılandırma seçeneklerini doldurun.
bağlantı dizesi kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Dekont
için tenant-id
izin verilen değerler şunlardır: common
, organizations
, consumers
veya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için Hata AADSTS50020 - Kimlik sağlayıcısı kullanıcı hesabı kiracıda yok hatasının Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz . Microsoft Entra Id'de tek kiracılı uygulamayı çok kiracılıya dönüştürme.
Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Dekont
için tenant-id
izin verilen değerler şunlardır: common
, organizations
, consumers
veya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için Hata AADSTS50020 - Kimlik sağlayıcısı kullanıcı hesabı kiracıda yok hatasının Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz . Microsoft Entra Id'de tek kiracılı uygulamayı çok kiracılıya dönüştürme.
Depolama Kuyruğuna
StorageQueueTemplate
ileti göndermek için fasulye ile oluşturunDefaultMessageHandler
.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Bir ileti kanalı aracılığıyla yukarıdaki ileti işleyicisiyle bir İleti ağ geçidi bağlaması oluşturun.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Ağ geçidini kullanarak ileti gönderme.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Azure Depolama Kuyruğundan ileti alma
Kimlik bilgisi yapılandırma seçeneklerini doldurun.
Giriş kanalı olarak bir ileti kanalı çekirdeği oluşturun.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Depolama Kuyruğuna
StorageQueueTemplate
ileti almak için bean ile oluşturunStorageQueueMessageSource
.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Daha önce oluşturduğumuz ileti kanalı aracılığıyla son adımda oluşturulan Depolama QueueMessageSource ile bir ileti alıcı bağlaması oluşturun.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Örnekler
Daha fazla bilgi için GitHub'daki azure-spring-boot-samples deposuna bakın.