Spring Cloud Stream con Azure Service Bus

En este artículo se muestra cómo usar Spring Cloud Stream Binder para enviar y recibir mensajes de queues y topics de Azure Service Bus.

Azure proporciona una plataforma de mensajería asincrónica denominada Azure Service Bus ("Service Bus") que se basa en el estándar Advanced Message Queueing Protocol 1.0 (AMQP 1.0). Service Bus puede usarse en el intervalo de plataformas de Azure admitidas.

Requisitos previos

Nota:

Para conceder a su cuenta acceso a los recursos de Azure Service Bus, asigne el rol Azure Service Bus Data Sender y Azure Service Bus Data Receiver a la cuenta de Microsoft Entra que está usando actualmente. Para obtener más información sobre cómo conceder roles de acceso, consulte Asignación de roles de Azure mediante Azure Portal y Autenticación y Autenticación y autorización de una aplicación con Microsoft Entra ID para acceder a las entidades de Azure Service Bus.

Importante

Se necesita Spring Boot versión 2.5 o superior para completar los pasos descritos en este artículo.

Envío y recepción de mensajes de Azure Service Bus

Con una cola o tema para Azure Service Bus, puede enviar y recibir mensajes mediante Spring Cloud Azure Stream Binder Service Bus.

Para instalar el módulo Spring Cloud Azure Stream Binder Service Bus, agregue las siguientes dependencias al archivo pom.xml:

  • La lista de materiales (BOM) de Azure de Spring Cloud:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.18.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Nota:

    Si usa Spring Boot 2.x, asegúrese de establecer la versión de spring-cloud-azure-dependencies en 4.19.0. Esta lista de materiales (BOM) debe configurarse en la sección <dependencyManagement> del archivo pom.xml. Esto garantiza que todas las dependencias de Spring Cloud Azure usen la misma versión. Para obtener más información sobre la versión que se usa para esta lista de materiales, consulte Qué versión de Spring Cloud Azure debo usar.

  • El artefacto de Spring Cloud Azure Stream Binder Service Bus:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Incorporación del código de la aplicación

Siga estos pasos para configurar la aplicación para usar una cola o un tema de Service Bus para enviar y recibir mensajes.

  1. Configure las credenciales de Service Bus en el archivo de configuración application.properties.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    En la siguiente tabla se describen los campos de la configuración:

    Campo Descripción
    spring.cloud.azure.servicebus.namespace Especifique el espacio de nombres que obtuvo en Service Bus desde Azure Portal.
    spring.cloud.stream.bindings.consume-in-0.destination Especifique la cola o el tema de Service Bus que usó en este tutorial.
    spring.cloud.stream.bindings.supply-out-0.destination Especifique el mismo valor que se usó para el destino de entrada.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Especifique si se van a liquidar los mensajes automáticamente. Si se establece como false, se agregará un encabezado de Checkpointer para permitir a los desarrolladores liquidar los mensajes manualmente.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Especifique el tipo de entidad para el enlace de salida, puede ser queue o topic.
    spring.cloud.function.definition Especifique el bean funcional que se va a enlazar a los destinos externos expuestos por los enlaces.
    spring.cloud.stream.poller.fixed-delay Especifique un retraso fijo para el sondeador predeterminado en milisegundos. El valor predeterminado es 1000 L. El valor recomendado es 60000.
    spring.cloud.stream.poller.initial-delay Especifique el retraso inicial para los desencadenadores periódicos. El valor predeterminado es 0.
  2. Edite el archivo de clase de inicio para mostrar el siguiente contenido.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Sugerencia

    En este tutorial, no hay ninguna operación de autenticación en las configuraciones ni en el código. Sin embargo, la conexión a los servicios de Azure requiere autenticación. Para completar la autenticación, debe usar Azure Identity. Spring Cloud Azure usa DefaultAzureCredential, que la biblioteca de Azure Identity proporciona para ayudarle a obtener credenciales sin cambios en el código.

    DefaultAzureCredential admite varios métodos de autenticación y determina qué método se usa en tiempo de ejecución. Este enfoque permite que la aplicación use diferentes métodos de autenticación en distintos entornos, como entornos locales o de producción, sin implementar código específico del entorno. Para obtener más información, vea DefaultAzureCredential.

    Para completar la autenticación en entornos de desarrollo local, puede usar la CLI de Azure, Visual Studio Code, PowerShell u otros métodos. Para obtener más información, consulte Autenticación de Azure en entornos de desarrollo de Java. Para completar la autenticación en entornos de hospedaje de Azure, se recomienda usar la identidad administrada asignada por el usuario. Para obtener más información, consulta ¿Qué son las identidades administradas para recursos de Azure?

  3. Inicie la aplicación. Los mensajes como el ejemplo siguiente se publicarán en el registro de la aplicación:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Pasos siguientes