What I want to Achieve - Control Service Bus Consumer to start / stop receiving messages from queue/topic.
Below is a detailed explanation.
Currently I have integrated Azure Service Bus in my application and we listen message as soon as spring boot application starts. Now I want to modify this logic. On ApplicationReadyEvent event I want to disable ServiceBusConsumer to start listening, then I want to perform some task and after that again I want to enable ServiceBusConsumer to start listening from topic / queue.
So how can I achieve that ?
application.yml
spring:
cloud:
azure:
servicebus:
namespace: **********
xxx:
azure:
servicebus:
connection: ***********
queue: **********
AzureConfiguration.java
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;
@Configuration
public class AzureConfiguration{
@Value("${xxx.azure.servicebus.connection}")
private String serviceBusConnection;
@Value("${xxx.azure.servicebus.queue}")
private String serviceBusQueue;
private static final String SERVICE_BUS_INPUT_CHANNEL = "yyyyy";
private static final String SENSOR_DATA_CHANNEL = "zzzzz";
private static final String SERVICE_BUS_LISTENER_CONTAINER = "aaaaa";
@Bean(name = SERVICE_BUS_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer serviceBusMessageListenerContainer(ServiceBusProcessorFactory processorFactory) {
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setConnectionString(serviceBusConnection);
containerProperties.setEntityName(serviceBusQueue);
containerProperties.setAutoComplete(true);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}
@Bean
public ServiceBusInboundChannelAdapter serviceBusInboundChannelAdapter(
@Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel,
@Qualifier(SERVICE_BUS_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inputChannel);
adapter.setAutoStartup(false);
return adapter;
}
@Bean(name = SERVICE_BUS_INPUT_CHANNEL)
public MessageChannel serviceBusInputChannel() {
return new DirectChannel();
}
@Bean(name = SENSOR_DATA_CHANNEL)
public MessageChannel sensorDataChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow serviceBusMessageFlow() {
return IntegrationFlows.from(SERVICE_BUS_INPUT_CHANNEL)
.<byte[], String>transform(String::new)
.channel(SENSOR_DATA_CHANNEL)
.get();
}
}
AppEventListenerService.java
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
@AllArgsConstructor
public class AppEventListenerService{
@EventListener(ApplicationReadyEvent.class)
public void OnApplicationStarted() {
log.debug("Enter OnApplicationStarted");
// Disable Azure Bus Message Listener
// do some task
// Enable Azure Bus Message Listener
log.debug("Exit OnApplicationStarted");
}
}
In above code in AppEventListenerService.java ,
// Disable Azure Bus Message Listener - Here I want to stop ServiceBusConsumer to receive message from topic/queue
// Enable Azure Bus Message Listener - Here I want to start ServiceBusConsumer to receive message from topic/queue.