Barramento de Serviço do Azure biblioteca de clientes para Java – versão 7.14.4

O Barramento de Serviço do Microsoft Azure é um agente de mensagens de integração empresarial totalmente gerenciado. O Barramento de Serviço pode separar aplicativos e serviços. O Barramento de Serviço oferece uma plataforma confiável e segura para a transferência assíncrona de dados e estados. Os dados são transferidos entre diferentes aplicativos e serviços usando mensagens. Se você quiser saber mais sobre Barramento de Serviço do Azure, talvez queira examinar: O que é o Barramento de Serviço

A biblioteca de clientes Barramento de Serviço do Azure permite enviar e receber mensagens Barramento de Serviço do Azure e pode ser usada para:

  • Transfira dados comerciais, como ordens de venda ou compra, diários ou movimentos de estoque.
  • Desacoplar aplicativos para melhorar a confiabilidade e a escalabilidade de aplicativos e serviços. Clientes e serviços não precisam estar online ao mesmo tempo.
  • Habilite as relações 1:n entre publicadores e assinantes.
  • Implemente fluxos de trabalho que exijam ordenação ou adiamento de mensagens.

Código-fonte | Documentação | de referência da APIDocumentação do produto | Amostras | Pacote (Maven)

Introdução

Pré-requisitos

Para criar rapidamente os recursos necessários do Barramento de Serviço no Azure e receber uma cadeia de conexão para eles, você pode implantar nosso modelo de exemplo clicando em:

Incluir o pacote

Incluir o arquivo da BOM

Inclua o azure-sdk-bom em seu projeto para assumir a dependência da versão ga (disponibilidade geral) da biblioteca. No trecho a seguir, substitua o espaço reservado {bom_version_to_target} pelo número de versão. Para saber mais sobre a BOM, consulte o BOM README do SDK do AZURE.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Depois, inclua a dependência direta na seção de dependências sem a marca de versão.

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
  </dependency>
</dependencies>

Incluir dependência direta

Se você quiser assumir a dependência de uma versão específica da biblioteca que não está presente na BOM, adicione a dependência direta ao seu projeto da seguinte maneira.

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
    <version>7.14.4</version>
</dependency>

Autenticar o cliente

Para que a biblioteca de clientes do Barramento de Serviço interaja com o Barramento de Serviço, ela precisará entender como se conectar e autorizar com ele.

Criar clientes do Barramento de Serviço usando uma cadeia de conexão

O meio mais fácil para autenticação é usar uma cadeia de conexão, que foi criada automaticamente ao criar um namespace do Barramento de Serviço. Se você não estiver familiarizado com políticas de acesso compartilhado no Azure, convém seguir o guia passo a passo para obter uma cadeia de conexão do Barramento de Serviço.

Os clientes remetente e receptor do Barramento de Serviço assíncrono e síncrono são instanciados usando ServiceBusClientBuilder. Os snippets abaixo criam um remetente do Barramento de Serviço síncrono e um receptor assíncrono, respectivamente.

ServiceBusSenderClient sender = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .sender()
    .queueName("<< QUEUE NAME >>")
    .buildClient();
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .receiver()
    .topicName("<< TOPIC NAME >>")
    .subscriptionName("<< SUBSCRIPTION NAME >>")
    .buildAsyncClient();

Criar um cliente do Barramento de Serviço usando a plataforma de Identidade da Microsoft (antigo Azure Active Directory)

O SDK do Azure para Java dá suporte ao pacote de Identidade do Azure, simplificando a obtenção de credenciais do plataforma de identidade da Microsoft. Primeiro, adicione o pacote:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-identity</artifactId>
    <version>1.10.1</version>
</dependency>
  • Problema conhecido: o arquivo pom.xml deve listar azure-messaging-servicebus antes azure-identity das bibliotecas de cliente. Esse problema foi resolvido com azure-identity:1.2.1. Veja aqui para obter mais detalhes.

As maneiras implementadas de solicitar uma credencial estão no com.azure.identity.credential pacote. O exemplo a seguir mostra como usar um segredo do cliente do aplicativo AAD (Azure Active Directory) para autorizar com Barramento de Serviço do Azure.

Autorizando com DefaultAzureCredential

A autorização é mais fácil usando DefaultAzureCredential. Ele encontra a melhor credencial a ser usada em seu ambiente de execução. Para obter mais informações sobre como usar a autorização do Azure Active Directory com o Barramento de Serviço, consulte a documentação associada.

Use a credencial de token retornada para autenticar o cliente:

TokenCredential credential = new DefaultAzureCredentialBuilder()
    .build();
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
    .credential("<<fully-qualified-namespace>>", credential)
    .receiver()
    .queueName("<<queue-name>>")
    .buildAsyncClient();

Principais conceitos

Você pode interagir com os tipos de recursos primários em um Namespace do Barramento de Serviço, do qual vários podem existir e em qual transmissão de mensagem real ocorre. O namespace geralmente serve como um contêiner de aplicativo:

  • Uma fila permite o envio e o recebimento de mensagens, ordenadas primeiro a entrar primeiro a sair. Geralmente, ele é usado para comunicação ponto a ponto.
  • Um tópico é mais adequado para cenários de editor e assinante. Um tópico publica mensagens em assinaturas, das quais várias podem existir simultaneamente.
  • Uma assinatura recebe mensagens de um tópico. Cada assinatura é independente e recebe uma cópia da mensagem enviada ao tópico.

Clientes do Barramento de Serviço

O construtor ServiceBusClientBuilder é usado para criar todos os clientes do Barramento de Serviço.

Exemplos

Enviar mensagens

Você precisará criar um assíncrono ServiceBusSenderAsyncClient ou síncrono ServiceBusSenderClient para enviar mensagens. Cada remetente pode enviar mensagens para uma fila ou um tópico.

O snippet abaixo cria um síncrono ServiceBusSenderClient para publicar uma mensagem em uma fila.

ServiceBusSenderClient sender = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .sender()
    .queueName("<< QUEUE NAME >>")
    .buildClient();
List<ServiceBusMessage> messages = Arrays.asList(
    new ServiceBusMessage("Hello world").setMessageId("1"),
    new ServiceBusMessage("Bonjour").setMessageId("2"));

sender.sendMessages(messages);

// When you are done using the sender, dispose of it.
sender.close();

Receber mensagens

Para receber mensagens, você precisará criar um ServiceBusProcessorClient com retornos de chamada para mensagens de entrada e qualquer erro que ocorra no processo. Em seguida, você pode iniciar e parar o cliente conforme necessário.

Ao receber mensagens com o modo PeekLock , ele informa ao agente que a lógica do aplicativo deseja liquidar (por exemplo, concluir, abandonar) as mensagens recebidas explicitamente.

// Sample code that processes a single message which is received in PeekLock mode.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
    final ServiceBusReceivedMessage message = context.getMessage();
    // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
    // handling message reaches desired state such that it doesn't require Service Bus to redeliver
    // the same message, then context.complete() should be called otherwise context.abandon().
    final boolean success = Math.random() < 0.5;
    if (success) {
        try {
            context.complete();
        } catch (Exception completionError) {
            System.out.printf("Completion of the message %s failed\n", message.getMessageId());
            completionError.printStackTrace();
        }
    } else {
        try {
            context.abandon();
        } catch (Exception abandonError) {
            System.out.printf("Abandoning of the message %s failed\n", message.getMessageId());
            abandonError.printStackTrace();
        }
    }
};

// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
    System.err.println("Error occurred while receiving message: " + errorContext.getException());
};

// create the processor client via the builder and its sub-builder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
                                .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
                                .processor()
                                .queueName("<< QUEUE NAME >>")
                                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                                .disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
                                .processMessage(processMessage)
                                .processError(processError)
                                .disableAutoComplete()
                                .buildProcessorClient();

// Starts the processor in the background and returns immediately
processorClient.start();

Ao receber mensagens com o modo ReceiveAndDelete , informa ao agente para considerar todas as mensagens que ele envia para o cliente receptor como liquidadas quando enviadas.

// Sample code that processes a single message which is received in ReceiveAndDelete mode.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
    final ServiceBusReceivedMessage message = context.getMessage();
    System.out.printf("handler processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
        message.getSequenceNumber(), message.getBody());
};

// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
    System.err.println("Error occurred while receiving message: " + errorContext.getException());
};

// create the processor client via the builder and its sub-builder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .processor()
    .queueName("<< QUEUE NAME >>")
    .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
    .processMessage(processMessage)
    .processError(processError)
    .disableAutoComplete()
    .buildProcessorClient();

// Starts the processor in the background and returns immediately
processorClient.start();

Há quatro maneiras de resolver mensagens usando os métodos no contexto da mensagem passados para o retorno de chamada.

  • Concluído – faz com que a mensagem seja excluída da fila ou do tópico.
  • Abandonar – libera o bloqueio do receptor na mensagem, permitindo que a mensagem seja recebida por outros receptores.
  • Adiar – adia o recebimento da mensagem por meios normais. Para receber mensagens adiadas, o número de sequência da mensagem precisa ser retido.
  • Mensagens mortas – move a mensagem para a fila de mensagens mortas. Isso impedirá que a mensagem seja recebida novamente. Para receber mensagens da fila de mensagens mortas, um receptor com escopo para a fila de mensagens mortas é necessário.

Enviar e receber de filas ou tópicos habilitados para sessão

O uso de sessões exige que você crie uma fila ou assinatura habilitada para sessão. Você pode ler mais sobre como configurar isso em "Sessões de mensagem".

As sessões de Barramento de Serviço do Azure permitem o tratamento conjunto e ordenado de sequências não associadas de mensagens relacionadas. As sessões podem ser usadas em padrões PEPS (primeiro a entrar, primeiro a sair) e solicitação-resposta. Qualquer remetente pode criar uma sessão ao enviar mensagens para um tópico ou fila definindo a ServiceBusMessage.setSessionId(String) propriedade como algum identificador definido pelo aplicativo que seja exclusivo para a sessão.

Ao contrário de filas ou assinaturas não habilitadas para sessão, apenas um único receptor pode ler de uma sessão a qualquer momento. Quando um receptor busca uma sessão, o Barramento de Serviço bloqueia a sessão desse receptor e tem acesso exclusivo às mensagens nessa sessão.

Enviar uma mensagem para uma sessão

Crie um ServiceBusSenderClient para uma assinatura de fila ou tópico habilitada para sessão. A configuração ServiceBusMessage.setSessionId(String) em um ServiceBusMessage publicará a mensagem nessa sessão. Se a sessão não existir, ela será criada.

// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
    .setSessionId("greetings");

sender.sendMessage(message);

Receber mensagens de uma sessão

O recebimento de mensagens de sessões é semelhante ao recebimento de mensagens de uma fila ou assinatura não habilitada para sessão. A diferença está no construtor e na classe que você usa.

Em caso que não seja de sessão, você usaria o subconstrutor processor(). No caso de sessões, você usaria o subconstrutor sessionProcessor(). Ambos os subconstrutores criarão uma instância do ServiceBusProcessorClient configurada para funcionar em uma sessão ou em uma entidade de Barramento de Serviço que não seja de sessão. No caso do processador de sessão, você pode passar o número máximo de sessões que deseja que o processador processe simultaneamente também.

Criar um receptor de fila de mensagens mortas

Barramento de Serviço do Azure filas e assinaturas de tópico fornecem uma sub-fila secundária, chamada DLQ (fila de mensagens mortas). A fila de mensagens mortas não precisa ser explicitamente criada e não pode ser excluída ou de alguma forma gerenciada independentemente da entidade principal. Para assinaturas de fila ou tópico de sessão habilitada ou que não são de sessão, o receptor de mensagens mortas pode ser criado da mesma maneira que mostrado abaixo. Saiba mais sobre a fila de mensagens mortas aqui.

ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .receiver() // Use this for session or non-session enabled queue or topic/subscriptions
    .topicName("<< TOPIC NAME >>")
    .subscriptionName("<< SUBSCRIPTION NAME >>")
    .subQueue(SubQueue.DEAD_LETTER_QUEUE)
    .buildClient();

Compartilhamento de conexão entre clientes

A criação da conexão física com o Barramento de Serviço requer recursos. Um aplicativo deve compartilhar a conexão
entre clientes que podem ser obtidos compartilhando o construtor de nível superior, conforme mostrado abaixo.

// Create shared builder.
ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>");
// Create receiver and sender which will share the connection.
ServiceBusReceiverClient receiver = sharedConnectionBuilder
    .receiver()
    .queueName("<< QUEUE NAME >>")
    .buildClient();
ServiceBusSenderClient sender = sharedConnectionBuilder
    .sender()
    .queueName("<< QUEUE NAME >>")
    .buildClient();

Quando usar 'ServiceBusProcessorClient'.

Quando usar 'ServiceBusProcessorClient', 'ServiceBusReceiverClient' ou ServiceBusReceiverAsyncClient? O processador é criado usando 'ServiceBusReceiverAsyncClient', ele fornece uma maneira conveniente de receber mensagens com preenchimento automático padrão e renovação automática de bloqueios de mensagens no modo 'PEEK_LOCK'. O processador é apropriado quando os aplicativos não fizeram a movimentação completa para o cliente receptor assíncrono e desejam processar a mensagem no modo síncrono. O processador recebe mensagens para sempre porque se recupera dos erros de rede internamente. A chamada de função 'ServiceBusProcessorClient:processMessage()' é feita para cada mensagem. Como alternativa, você também pode usar 'ServiceBusReceiverClient', ele é um cliente de nível inferior e fornece uma gama mais ampla de APIs. Se o processamento assíncrono for
adequado para seu aplicativo, você pode usar 'ServiceBusReceiverAsyncClient'.

Solução de problemas

Habilitar o log do cliente

O SDK do Azure para Java oferece uma história de log consistente para ajudar a solucionar problemas de erros do aplicativo e agilizar a resolução. Os logs produzidos capturam o fluxo de um aplicativo antes que acessem o estado do terminal para ajudar a localizar o problema raiz. Exiba o wiki de log para obter diretrizes sobre como habilitar o registro em log.

Habilitar o registro em log de transporte AMQP

Se habilitar o log do cliente não for suficiente para diagnosticar seus problemas. Você pode habilitar o registro em log em um arquivo na biblioteca AMQP subjacente, Qpid Proton-J. O Qpid Proton-J usa java.util.logging. Você pode habilitar o registro em log criando um arquivo de configuração com o conteúdo abaixo. Ou defina proton.trace.level=ALL e qualquer opção de configuração desejada para a java.util.logging.Handler implementação. As classes de implementação e suas opções podem ser encontradas no javadoc do SDK do Java 8.

Para rastrear os quadros de transporte AMQP, defina a variável de ambiente: PN_TRACE_FRM=1.

Exemplo de arquivo "logging.properties"

O arquivo de configuração abaixo registra a saída de rastreamento de proton-j para o arquivo "proton-trace.log".

handlers=java.util.logging.FileHandler
.level=OFF
proton.trace.level=ALL
java.util.logging.FileHandler.level=ALL
java.util.logging.FileHandler.pattern=proton-trace.log
java.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %3$s %4$s: %5$s %n

Exceções comuns

Exceção AMQP

Essa é uma exceção geral para falhas relacionadas ao AMQP, que inclui os erros amqp como ErrorCondition e o contexto que causou essa exceção como AmqpErrorContext. isTransient é um booliano que indica se a exceção é um erro transitório ou não. Se ocorrer uma exceção AMQP transitória, a biblioteca de clientes repetirá a operação quantas vezes o AmqpRetryOptions permitir. Palavras posteriores, a operação falha e uma exceção é propagada de volta para o usuário.

AmqpErrorCondition contém condições de erro comuns ao protocolo AMQP e usadas pelos serviços do Azure. Quando uma exceção AMQP é gerada, examinar o campo de condição de erro pode informar os desenvolvedores sobre por que a exceção AMQP ocorreu e, se possível, como atenuar essa exceção. Uma lista de todas as exceções do AMQP pode ser encontrada em Erros de Transporte do OASIS AMQP versão 1.0.

A maneira recomendada de resolver a exceção específica que a exceção AMQP representa é seguir as diretrizes de Exceções de Mensagens do Barramento de Serviço .

Noções básicas sobre o comportamento das APIs

O documento aqui fornece informações sobre o comportamento esperado da API síncrona receiveMessages ao usá-la para obter mais de uma mensagem (também conhecida como pré-busca implícita).

Próximas etapas

Além dos discutidos, a biblioteca de clientes Barramento de Serviço do Azure oferece suporte para muitos cenários adicionais para ajudar a aproveitar o conjunto completo de recursos do serviço Barramento de Serviço do Azure. Para ajudar a explorar alguns desses cenários, o conjunto de exemplos a seguir está disponível aqui.

Participante

Se você quiser se tornar um contribuidor ativo para este projeto, consulte nossas Diretrizes de Contribuição para obter mais informações.

Impressões