Spring Tümleştirmesi için Spring Cloud Azure desteği
Bu makale şunlar için geçerlidir:✅ Sürüm 4.19.0 ✅ Sürüm 5.19.0
Azure için Spring Tümleştirme Uzantısı, Javaiçin Azure SDK
-
spring-cloud-azure-starter-integration-eventhubs
- Daha fazla bilgi için bkz. Azure Event Hubs ile Spring Tümleştirmesi - Daha fazla bilgi için bkz. Azure Service Bus ile Spring Integration -
spring-cloud-azure-starter-integration-storage-queue
- Daha fazla bilgi için bkz. Azure Depolama Kuyruğu ile Spring Tümleştirmesi
Azure Event Hubs ile Spring Tümleştirmesi
Temel kavramlar
Azure Event Hubs, büyük bir veri akışı platformu ve olay alımı hizmetidir. Saniyede milyonlarca olay alabilir ve işleyebilir. Bir olay hub'ına gönderilen veriler, herhangi bir gerçek zamanlı analiz sağlayıcısı veya toplu işlem/depolama bağdaştırıcısı kullanılarak dönüştürülebilir ve depolanabilir.
Spring Integration, Spring tabanlı uygulamalarda basit mesajlaşmaya olanak tanır ve bildirim temelli bağdaştırıcılar aracılığıyla dış sistemlerle tümleştirmeyi destekler. Bu bağdaştırıcılar Spring'in uzaktan iletişim, mesajlaşma ve zamanlama desteği konusunda daha yüksek düzeyde soyutlama sağlar. Event Hubs için Spring Integration uzantısı projesi, Azure Event Hubs için gelen ve giden kanal bağdaştırıcıları ve ağ geçitleri sağlar.
Not
RxJava destek API'leri 4.0.0 sürümünden bırakılır. Ayrıntılar için bkz. Javadoc.
Tüketici grubu
Event Hubs, tüketici grubu için Apache Kafka ile benzer destek sağlar ancak biraz farklı mantık sunar. Kafka, işlenen tüm uzaklıkları aracıda depolasa da, el ile işlenen Event Hubs iletilerinin uzaklıklarını depolamanız gerekir. Event Hubs SDK'sı, bu tür uzaklıkları Azure Depolama'da depolama işlevi sağlar.
Bölümleme desteği
Event Hubs, Kafka ile benzer bir fiziksel bölüm kavramı sağlar. Ancak Kafka'nın tüketiciler ve bölümler arasında otomatik yeniden dengelemeden farklı olarak Event Hubs bir tür önleyici mod sağlar. Depolama hesabı, hangi bölümün hangi tüketiciye ait olduğunu belirlemek için kira görevi görür. Yeni bir tüketici başladığında, iş yükü dengelemeyi başarmak için çoğu ağır yüklü tüketiciden bazı bölümleri çalmaya çalışır.
Geliştiriciler, yük dengeleme stratejisini belirtmek için yapılandırma için EventHubsContainerProperties
kullanabilir.
Batch tüketici desteği
EventHubsInboundChannelAdapter
toplu işlem modunu destekler. Kullanıcılar bunu etkinleştirmek için ListenerMode.BATCH
örneği oluştururken dinleyici modunu EventHubsInboundChannelAdapter
olarak belirtebilir.
Etkinleştirildiğinde yükü toplu olayların listesi olan bir İletisi alınır ve aşağı akış kanalına geçirilir. Her ileti üst bilgisi, içeriği her olaydan ayrıştırılan ilişkili üst bilgi değeri olan bir liste olarak da dönüştürülür. Bölüm kimliği, denetim noktası oluşturucu ve son sıraya alınan özelliklerin ortak üst bilgileri için, olay grubunun tamamı için tek bir değer olarak sunulurlar ve aynı değeri paylaşırlar. Daha fazla bilgi için Event Hubs İleti Üst Bilgileri bölümüne bakın.
Not
Denetim noktası üst bilgisi yalnızca MANUAL denetim noktası modu kullanıldığında bulunur.
Toplu tüketici denetim noktası oluşturma iki modu destekler: BATCH
ve MANUAL
.
BATCH
modu, olay toplu işleminin tamamını alındıktan sonra birlikte kontrol etmek için otomatik bir denetim noktası modudur.
MANUAL
modu, kullanıcılara göre olayları kontrol etmektir. Kullanıldığında, Denetim Noktası Denetleyicisi ileti üst bilgisine geçirilir ve kullanıcılar bunu denetim noktası oluşturma amacıyla kullanabilir.
Toplu işlem kullanan ilke, max-size
ve max-wait-time
özellikleri tarafından belirtilebilir; burada max-size
gerekli bir özelliktir ve max-wait-time
isteğe bağlıdır.
Geliştiriciler toplu kullanım stratejisini belirtmek için yapılandırma için EventHubsContainerProperties
kullanabilir.
Bağımlılık kurulumu
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Konfigürasyon
Bu başlatıcı, yapılandırma seçeneklerinin aşağıdaki 3 bölümünü sağlar:
Bağlantı Yapılandırma Özellikleri
Bu bölüm, Azure Event Hubs'a bağlanmak için kullanılan yapılandırma seçeneklerini içerir.
Not
Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz güvenlik sorumlusuna Azure kaynağına erişmek için yeterli izin verildiğinden emin olmak için microsoft entra id erişimi yetkilendirme
spring-cloud-azure-starter-integration-eventhubs'ın bağlantı yapılandırılabilir özellikleri:
Mülk | Tür | Açıklama |
---|---|---|
spring.cloud.azure.eventhubs.enabled | Boolean | Azure Event Hubs'ın etkinleştirilip etkinleştirilmediği. |
spring.cloud.azure.eventhubs.connection-string |
Dizgi | Event Hubs Ad Alanı bağlantı dizesi değeri. |
spring.cloud.azure.eventhubs.namespace | Dizgi | FQDN'nin ön eki olan Event Hubs Ad Alanı değeri. FQDN, NamespaceName.DomainName dosyasından oluşmalıdır |
spring.cloud.azure.eventhubs.domain-name | Dizgi | Azure Event Hubs Ad Alanı değerinin etki alanı adı. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Dizgi | Özel Uç Nokta adresi. |
spring.cloud.azure.eventhubs.shared-connection |
Boolean | Temel alınan EventProcessorClient ve EventHubProducerAsyncClient'ın aynı bağlantıyı kullanıp kullanmadığı. Varsayılan olarak, oluşturulan her Olay Hub'ı istemcisi için yeni bir bağlantı oluşturulur ve oluşturulur. |
Denetim Noktası Yapılandırma Özellikleri
Bu bölüm, bölüm sahipliğini ve denetim noktası bilgilerini kalıcı hale etmek için kullanılan Depolama Blobları hizmetinin yapılandırma seçeneklerini içerir.
Not
sürüm 4.0.0'dan itibaren, spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists özelliği el ile etkinleştirilmediğinde depolama kapsayıcısı otomatik olarak oluşturulmaz.
Spring-cloud-azure-starter-integration-eventhubs'ın yapılandırılabilir özelliklerini denetleme:
Mülk | Tür | Açıklama |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolean | Mevcut değilse kapsayıcı oluşturmaya izin verilip verilmeyeceği. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name |
Dizgi | Depolama hesabının adı. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Dizgi | Depolama hesabı erişim anahtarı. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Dizgi | Depolama kapsayıcısı adı. |
Yaygın Azure Hizmeti SDK yapılandırma seçenekleri, Depolama Blobu denetim noktası deposu için de yapılandırılabilir. Desteklenen yapılandırma seçenekleri Spring Cloud Azure yapılandırmasunulmuştur ve birleştirilmiş ön ek spring.cloud.azure.
veya spring.cloud.azure.eventhubs.processor.checkpoint-store
ön eki ile yapılandırılabilir.
Event Hub işlemci yapılandırma özellikleri
EventHubsInboundChannelAdapter
, EventProcessorClient
kullanarak bir olay hub'ından gelen iletileri kullanarak bir EventProcessorClient
genel özelliklerini yapılandırabilir ve geliştiriciler yapılandırma için EventHubsContainerProperties
kullanabilir.
Temel kullanım
Azure Event Hubs'a ileti gönderme
Kimlik bilgisi yapılandırma seçeneklerini doldurun.
Bağlantı dizesi olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Not
tenant-id
için izin verilen değerler şunlardır: common
, organizations
, consumers
veya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.
Event Hubs'a ileti göndermek için
DefaultMessageHandler
çekirdeğiyleEventHubsTemplate
oluşturun.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Bir ileti kanalı aracılığıyla yukarıdaki ileti işleyicisiyle bir ileti ağ geçidi bağlaması oluşturun.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Ağ geçidini kullanarak ileti gönderme.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Azure Event Hubs'dan ileti alma
Kimlik bilgisi yapılandırma seçeneklerini doldurun.
Giriş kanalı olarak bir ileti kanalı çekirdeği oluşturun.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Event Hubs'dan ileti almak için
EventHubsInboundChannelAdapter
çekirdeğiyleEventHubsMessageListenerContainer
oluşturun.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Daha önce oluşturulan ileti kanalı aracılığıyla EventHubsInboundChannelAdapter ile bir ileti alıcısı bağlaması oluşturun.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
ObjectMapper'ı özelleştirmek için EventHubsMessageConverter'ı yapılandırma
EventHubsMessageConverter
, kullanıcıların ObjectMapper'ı özelleştirmesine izin vermek için yapılandırılabilir bir fasulye olarak yapılır.
Batch tüketici desteği
Event Hubs'tan gelen iletileri toplu olarak kullanmak yukarıdaki örneğe benzerdir; ayrıca, kullanıcıların EventHubsInboundChannelAdapter
için toplu işlem kullanan ilgili yapılandırma seçeneklerini ayarlaması gerekir.
EventHubsInboundChannelAdapter
oluştururken dinleyici modu BATCH
olarak ayarlanmalıdır.
EventHubsMessageListenerContainer
çekirdeği oluşturduğunuzda, denetim noktası modunu MANUAL
veya BATCH
olarak ayarlayın; toplu iş seçenekleri gerektiği gibi yapılandırılabilir.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Event Hubs ileti üst bilgileri
Aşağıdaki tabloda Event Hubs ileti özelliklerinin Spring ileti üst bilgileriyle nasıl eşlendiği gösterilmektedir. Azure Event Hubs için ileti event
olarak adlandırılır.
Kayıt Dinleyicisi Modunda Event Hubs İletisi / Olay Özellikleri ile Spring message üst bilgileri arasında eşleme:
Event Hubs Olay Özellikleri | Spring Message Header Sabitleri | Tür | Açıklama |
---|---|---|---|
Sıralanan süre | EventHubsHeaders#ENQUEUED_TIME | Dakika | Olayın Olay Hub'ı bölümünde ne zaman sıralandığına ilişkin utc olarak anlık değer. |
Ofset | EventHubsHeaders#OFFSET | Uzun | İlişkili Olay Hub'ı bölümünden alınan olayın uzaklığı. |
Bölüm anahtarı | AzureHeaders#PARTITION_KEY | Dizgi | Olay ilk yayımlandığında ayarlandıysa bölüm karma anahtarı. |
Bölüm Kimliği | AzureHeaders#RAW_PARTITION_ID | Dizgi | Olay Hub'ının bölüm kimliği. |
Sıra numarası | EventHubsHeaders#SEQUENCE_NUMBER | Uzun | İlişkili Olay Hub'ı bölümünde sıralandığında olaya atanan sıra numarası. |
Son sıralanan olay özellikleri | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Bu bölümdeki son sıraya alınan olayın özellikleri. |
NA | AzureHeaders#CHECKPOINTER | Denetim Noktası Oluşturucu | Belirli bir iletinin denetim noktası üst bilgisi. |
Kullanıcılar, her olayın ilgili bilgileri için ileti üst bilgilerini ayrıştırabilir. Olay için bir ileti üst bilgisi ayarlamak için, tüm özelleştirilmiş üst bilgiler bir olayın uygulama özelliği olarak konur ve burada üst bilgi özellik anahtarı olarak ayarlanır. Event Hubs'dan olaylar alındığında, tüm uygulama özellikleri ileti üst bilgisine dönüştürülür.
Not
Bölüm anahtarı, sıralanan süre, uzaklık ve sıra numarasının ileti üst bilgilerinin el ile ayarlanması desteklenmez.
Toplu iş tüketici modu etkinleştirildiğinde, toplu iletilerin belirli üst bilgileri, her bir Event Hubs olayından değerlerin listesini içeren aşağıdaki şekilde listelenir.
Toplu dinleyici modunda Event Hubs İletisi / Olay Özellikleri ile Spring message üst bilgileri arasında eşleme:
Event Hubs Olay Özellikleri | Spring Batch İleti Üst Bilgi Sabitleri | Tür | Açıklama |
---|---|---|---|
Sıralanan süre | EventHubsHeaders#ENQUEUED_TIME | Anlık Liste | Olay Hub'ı bölümünde her olayın ne zaman sıralandığının UTC olarak anlık listesi. |
Ofset | EventHubsHeaders#OFFSET | Uzun Listesi | İlişkili Olay Hub'ı bölümünden alınan her olayın uzaklığının listesi. |
Bölüm anahtarı | AzureHeaders#PARTITION_KEY | Dize Listesi | Her olayı özgün olarak yayımlarken ayarlandıysa bölüm karma anahtarının listesi. |
Sıra numarası | EventHubsHeaders#SEQUENCE_NUMBER | Uzun Listesi | İlişkili Olay Hub'ı bölümünde sıralandığında her olaya atanan sıra numarasının listesi. |
Sistem özellikleri | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Harita Listesi | Her olayın sistem özelliklerinin listesi. |
Uygulama özellikleri | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Harita Listesi | Tüm özelleştirilmiş ileti üst bilgilerinin veya olay özelliklerinin yerleştirildiği her olayın uygulama özelliklerinin listesi. |
Not
İletiler yayımlandığında, varsa yukarıdaki toplu iş üst bilgilerinin tümü iletilerden kaldırılır.
Örnekleri
Daha fazla bilgi için GitHub'daki azure-spring-boot-samples
Azure Service Bus ile Spring Tümleştirmesi
Temel kavramlar
Spring Integration, Spring tabanlı uygulamalarda basit mesajlaşmaya olanak tanır ve bildirim temelli bağdaştırıcılar aracılığıyla dış sistemlerle tümleştirmeyi destekler.
Azure Service Bus için Spring Integration uzantısı projesi, Azure Service Bus için gelen ve giden kanal bağdaştırıcıları sağlar.
Not
CompletableFuture destek API'leri 2.10.0 sürümünden kullanım dışı bırakıldı ve 4.0.0 sürümünden Reactor Core ile değiştirildi. Ayrıntılar için bkz. Javadoc.
Bağımlılık kurulumu
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Konfigürasyon
Bu başlatıcı, yapılandırma seçeneklerinin aşağıdaki 2 bölümünü sağlar:
Bağlantı yapılandırma özellikleri
Bu bölüm, Azure Service Bus'a bağlanmak için kullanılan yapılandırma seçeneklerini içerir.
Not
Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz güvenlik sorumlusuna Azure kaynağına erişmek için yeterli izin verildiğinden emin olmak için microsoft entra id erişimi yetkilendirme
spring-cloud-azure-starter-integration-servicebus'ın bağlantı yapılandırılabilir özellikleri:
Mülk | Tür | Açıklama |
---|---|---|
spring.cloud.azure.servicebus.enabled |
Boolean | Azure Service Bus'ın etkinleştirilip etkinleştirilmediği. |
spring.cloud.azure.servicebus.connection-string | Dizgi | Service Bus Ad Alanı bağlantı dizesi değeri. |
spring.cloud.azure.servicebus.custom-endpoint-address | Dizgi | Service Bus'a bağlanırken kullanılacak özel uç nokta adresi. |
spring.cloud.azure.servicebus.namespace |
Dizgi | FQDN ön eki olan Service Bus Ad Alanı değeri. FQDN, NamespaceName.DomainName dosyasından oluşmalıdır |
spring.cloud.azure.servicebus.domain-name | Dizgi | Azure Service Bus Ad Alanı değerinin etki alanı adı. |
Service Bus işlemci yapılandırma özellikleri
ServiceBusInboundChannelAdapter
, bir ServiceBusProcessorClient
genel özelliklerini yapılandırmak üzere iletileri kullanmak için ServiceBusProcessorClient
kullanır. Geliştiriciler yapılandırma için ServiceBusContainerProperties
kullanabilir.
Temel kullanım
Azure Service Bus'a ileti gönderme
Kimlik bilgisi yapılandırma seçeneklerini doldurun.
Bağlantı dizesi olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Not
tenant-id
için izin verilen değerler şunlardır: common
, organizations
, consumers
veya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.
Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Not
tenant-id
için izin verilen değerler şunlardır: common
, organizations
, consumers
veya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.
Service Bus'a ileti göndermek için
DefaultMessageHandler
bean ileServiceBusTemplate
oluşturun, ServiceBusTemplate için varlık türünü ayarlayın. Bu örnekte örnek olarak Service Bus Kuyruğu alınıyor.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Bir ileti kanalı aracılığıyla yukarıdaki ileti işleyicisiyle bir ileti ağ geçidi bağlaması oluşturun.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Ağ geçidini kullanarak ileti gönderme.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Azure Service Bus'tan ileti alma
Kimlik bilgisi yapılandırma seçeneklerini doldurun.
Giriş kanalı olarak bir ileti kanalı çekirdeği oluşturun.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Service Bus'a ileti almak için
ServiceBusInboundChannelAdapter
çekirdeğiyleServiceBusMessageListenerContainer
oluşturun. Bu örnekte örnek olarak Service Bus Kuyruğu alınıyor.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Daha önce oluşturduğumuz ileti kanalı aracılığıyla
ServiceBusInboundChannelAdapter
ile bir ileti alıcı bağlaması oluşturun.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
ObjectMapper'ı özelleştirmek için ServiceBusMessageConverter'ı yapılandırma
ServiceBusMessageConverter
, kullanıcıların ObjectMapper
özelleştirmesine izin vermek için yapılandırılabilir bir fasulye olarak yapılır.
Service Bus ileti üst bilgileri
Birden çok Spring üst bilgisi sabitine eşlenebilen bazı Service Bus üst bilgileri için, farklı Spring üst bilgilerinin önceliği listelenir.
Service Bus Üst Bilgileri ile Spring Üst Bilgileri arasında eşleme:
Service Bus ileti üst bilgileri ve özellikleri | Spring message header sabitleri | Tür | Yapılandırılabilir | Açıklama |
---|---|---|---|---|
İçerik türü | MessageHeaders#CONTENT_TYPE |
Dizgi | Evet | İletinin RFC2045 İçerik Türü tanımlayıcısı. |
Bağıntı Kimliği | ServiceBusMessageHeaders#CORRELATION_ID |
Dizgi | Evet | İletinin bağıntı kimliği |
İleti Kimliği | ServiceBusMessageHeaders#MESSAGE_ID |
Dizgi | Evet | İletinin ileti kimliği, bu üst bilgi MessageHeaders#ID 'den daha yüksek önceliğe sahiptir. |
İleti Kimliği | MessageHeaders#ID |
UUID | Evet | İletinin ileti kimliği, bu üst bilgi ServiceBusMessageHeaders#MESSAGE_ID 'dan daha düşük önceliğe sahiptir. |
Bölüm anahtarı | ServiceBusMessageHeaders#PARTITION_KEY |
Dizgi | Evet | İletiyi bölümlenmiş bir varlığa göndermek için bölüm anahtarı. |
Yanıtla | MessageHeaders#REPLY_CHANNEL |
Dizgi | Evet | Yanıtların gönderleneceği varlığın adresi. |
Oturum kimliğini yanıtlama | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
Dizgi | Evet | İletinin ReplyToGroupId özellik değeri. |
Zamanlanan sıra saati utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Evet | İletinin Service Bus'ta sıralanması gereken tarih saat, bu üst bilgi AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE 'den daha yüksek önceliğe sahiptir. |
Zamanlanan sıra saati utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Tam sayı | Evet | Service Bus'ta iletinin sıraya alınması gereken tarih saat, bu üst bilgi ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME 'den daha düşük önceliğe sahiptir. |
Oturum Kimliği | ServiceBusMessageHeaders#SESSION_ID |
Dizgi | Evet | Oturum kullanan bir varlık için oturum IDentifier. |
Yaşam süresi | ServiceBusMessageHeaders#TIME_TO_LIVE |
Süre | Evet | Bu iletinin süresi dolmadan önceki süre. |
Hedef | ServiceBusMessageHeaders#TO |
Dizgi | Evet | yönlendirme senaryolarında gelecekte kullanılmak üzere ayrılmış ve aracı tarafından şu anda yoksayılan iletinin "to" adresi. |
Konu | ServiceBusMessageHeaders#SUBJECT |
Dizgi | Evet | İletinin konusu. |
Teslim edilemeyen harf hatası açıklaması | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
Dizgi | Hayır | Teslim edilemeyen iletinin açıklaması. |
Teslim edilemeyen mektup nedeni | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
Dizgi | Hayır | Bir iletinin yazılmama nedeni. |
Teslim edilemeyen harf kaynağı | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
Dizgi | Hayır | İletinin teslim edilmediği varlık. |
Teslim sayısı | ServiceBusMessageHeaders#DELIVERY_COUNT |
uzun | Hayır | Bu iletinin istemcilere teslim etme sayısı. |
Sıralanmış sıra numarası | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
uzun | Hayır | Service Bus tarafından bir iletiye atanan sıralı sıra numarası. |
Sıralanan süre | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Hayır | Bu iletinin Service Bus'ta sıralandığı tarih saat. |
Son kullanma tarihi: | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Hayır | Bu iletinin süresinin dolacağı tarih saat. |
Kilit belirteci | ServiceBusMessageHeaders#LOCK_TOKEN |
Dizgi | Hayır | Geçerli ileti için kilit belirteci. |
Kilitlenene kadar | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Hayır | Bu iletinin kilidinin süresinin dolmasına neden olan tarih saat. |
Sıra numarası | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
uzun | Hayır | Service Bus tarafından bir iletiye atanan benzersiz numara. |
Devlet | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Hayır | Etkin, Ertelenmiş veya Zamanlanmış olabilecek iletinin durumu. |
Bölüm anahtarı desteği
Bu başlatıcı, ileti üst bilgisinde bölüm anahtarı ve oturum kimliği ayarlamaya izin vererek service bus bölümleme
Önerilen: üst bilgi anahtarı olarak ServiceBusMessageHeaders.PARTITION_KEY
kullanın.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Önerilmez ancak şu anda desteklenir: üst bilginin anahtarı olarak AzureHeaders.PARTITION_KEY
.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Not
İleti üst bilgilerinde hem ServiceBusMessageHeaders.PARTITION_KEY
hem de AzureHeaders.PARTITION_KEY
ayarlandığında, ServiceBusMessageHeaders.PARTITION_KEY
tercih edilir.
Oturum desteği
Bu örnekte, uygulamadaki bir iletinin oturum kimliğinin el ile nasıl ayarlanacağı gösterilmektedir.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Not
İleti üst bilgilerinde ServiceBusMessageHeaders.SESSION_ID
ayarlandığında ve farklı bir ServiceBusMessageHeaders.PARTITION_KEY
üst bilgisi de ayarlandığında, bölüm anahtarının değerinin üzerine yazmak için oturum kimliğinin değeri sonunda kullanılır.
Service Bus istemci özelliklerini özelleştirme
Geliştiriciler Service Bus İstemcisi özelliklerini özelleştirmek için AzureServiceClientBuilderCustomizer
kullanabilir. Aşağıdaki örnek, sessionIdleTimeout
içindeki ServiceBusClientBuilder
özelliğini özelleştirmektedir:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Örnekleri
Daha fazla bilgi için GitHub'daki azure-spring-boot-samples
Azure Depolama Kuyruğu ile Spring Tümleştirmesi
Temel kavramlar
Azure Kuyruk Depolama, çok sayıda iletiyi depolamaya yönelik bir hizmettir. HTTP veya HTTPS kullanarak kimliği doğrulanmış çağrılar aracılığıyla dünyanın herhangi bir yerinden iletilere erişebilirsiniz. Kuyruk iletisinin boyutu en fazla 64 KB olabilir. Kuyruk, depolama hesabının toplam kapasite sınırına kadar milyonlarca ileti içerebilir. Kuyruklar genellikle zaman uyumsuz olarak işlenmek üzere bir iş kapsamı oluşturmak için kullanılır.
Bağımlılık kurulumu
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Konfigürasyon
Bu başlatıcı aşağıdaki yapılandırma seçeneklerini sağlar:
Bağlantı yapılandırma özellikleri
Bu bölüm, Azure Depolama Kuyruğu'na bağlanmak için kullanılan yapılandırma seçeneklerini içerir.
Not
Azure kaynağına erişim için Microsoft Entra Id ile kimlik doğrulaması yapmak ve yetkilendirmek için bir güvenlik sorumlusu kullanmayı seçerseniz güvenlik sorumlusuna Azure kaynağına erişmek için yeterli izin verildiğinden emin olmak için microsoft entra id erişimi yetkilendirme
spring-cloud-azure-starter-integration-storage-queue'un bağlantı yapılandırılabilir özellikleri:
Mülk | Tür | Açıklama |
---|---|---|
spring.cloud.azure.storage.queue.enabled |
Boolean | Azure Depolama Kuyruğu'nın etkinleştirilip etkinleştirilmediği. |
spring.cloud.azure.storage.queue.connection-string | Dizgi | Depolama Kuyruğu Ad Alanı bağlantı dizesi değeri. |
spring.cloud.azure.storage.queue.accountName | Dizgi | Depolama Kuyruğu hesap adı. |
spring.cloud.azure.storage.queue.accountKey | Dizgi | Depolama Kuyruğu hesap anahtarı. |
spring.cloud.azure.storage.queue.endpoint |
Dizgi | Depolama Kuyruğu hizmet uç noktası. |
spring.cloud.azure.storage.queue.sasToken | Dizgi | Sas belirteci kimlik bilgileri |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | API istekleri yapılırken kullanılan QueueServiceVersion. |
spring.cloud.azure.storage.queue.messageEncoding | Dizgi | Kuyruk iletisi kodlama. |
Temel kullanım
Azure Depolama Kuyruğuna ileti gönderme
Kimlik bilgisi yapılandırma seçeneklerini doldurun.
Bağlantı dizesi olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
Yönetilen kimlik olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Not
tenant-id
için izin verilen değerler şunlardır: common
, organizations
, consumers
veya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.
Hizmet sorumlusu olarak kimlik bilgileri için application.yml dosyanızda aşağıdaki özellikleri yapılandırın:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Not
tenant-id
için izin verilen değerler şunlardır: common
, organizations
, consumers
veya kiracı kimliği. Bu değerler hakkında daha fazla bilgi için, Hata AADSTS50020 - Kimlik sağlayıcısından kullanıcı hesabı kiracımevcut değil bölümünün Yanlış uç nokta (kişisel ve kuruluş hesapları) kullanıldı bölümüne bakın. Tek kiracılı uygulamanızı dönüştürme hakkında bilgi için bkz. Tek kiracılı uygulamayı Microsoft Entra IDüzerinde çok kiracılıya dönüştürme.
Depolama Kuyruğuna ileti göndermek için
DefaultMessageHandler
çekirdeğiyleStorageQueueTemplate
oluşturun.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Bir ileti kanalı aracılığıyla yukarıdaki ileti işleyicisiyle bir İleti ağ geçidi bağlaması oluşturun.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Ağ geçidini kullanarak ileti gönderme.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Azure Depolama Kuyruğu'ndan ileti alma
Kimlik bilgisi yapılandırma seçeneklerini doldurun.
Giriş kanalı olarak bir ileti kanalı çekirdeği oluşturun.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
depolama kuyruğuna ileti almak için
StorageQueueMessageSource
bean ileStorageQueueTemplate
oluşturun.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Daha önce oluşturduğumuz ileti kanalı aracılığıyla son adımda oluşturulan StorageQueueMessageSource ile bir ileti alıcısı bağlaması oluşturun.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Örnekleri
Daha fazla bilgi için GitHub'daki azure-spring-boot-samples