Programação assíncrona no SDK do Azure para Java

Este artigo descreve o modelo de programação assíncrona no SDK do Azure para Java.

Inicialmente, o SDK do Azure continha apenas APIs assíncronas sem bloqueio para interagir com os serviços do Azure. Essas APIs permitem que você use o SDK do Azure para criar aplicativos escaláveis que usam recursos do sistema de forma eficiente. No entanto, o SDK do Azure para Java também contém clientes síncronos para atender a um público mais amplo e também tornar nossas bibliotecas de cliente acessíveis para usuários não familiarizados com programação assíncrona. (Ver Acessível nas diretrizes de design do SDK do Azure.) Como tal, todas as bibliotecas de cliente Java no SDK do Azure para Java oferecem clientes assíncronos e síncronos. No entanto, recomendamos o uso de clientes assíncronos para sistemas de produção para maximizar o uso de recursos do sistema.

Fluxos reativos

Se você examinar a seção Clientes de Serviço Assíncronos nas Diretrizes de Design do SDK do Java Azure, notará que, em vez de usar CompletableFuture o fornecido pelo Java 8, nossas APIs assíncronas usam tipos reativos. Por que escolhemos tipos reativos em vez de tipos que estão disponíveis nativamente no JDK?

O Java 8 introduziu recursos como Streams, Lambdas e CompletableFuture. Esses recursos fornecem muitos recursos, mas têm algumas limitações.

CompletableFuture fornece recursos baseados em retorno de chamada, sem bloqueio, e a interface permite a CompletionStage fácil composição de uma série de operações assíncronas. Os Lambdas tornam essas APIs baseadas em push mais legíveis. Os fluxos fornecem operações de estilo funcional para lidar com uma coleção de elementos de dados. No entanto, os fluxos são síncronos e não podem ser reutilizados. CompletableFuture permite que você faça uma única solicitação, fornece suporte para um retorno de chamada e espera uma única resposta. No entanto, muitos serviços de nuvem exigem a capacidade de transmitir dados - Hubs de Eventos, por exemplo.

Os fluxos reativos podem ajudar a superar essas limitações transmitindo elementos de uma fonte para um assinante. Quando um assinante solicita dados de uma fonte, a fonte envia qualquer número de resultados de volta. Não precisa enviá-los todos de uma vez. A transferência acontece ao longo de um período de tempo, sempre que a fonte tem dados para enviar.

Nesse modelo, o assinante registra manipuladores de eventos para processar dados quando eles chegam. Essas interações baseadas em push notificam o assinante por meio de sinais distintos:

  • Uma onSubscribe() chamada indica que a transferência de dados está prestes a começar.
  • Uma onError() chamada indica que houve um erro, que também marca o fim da transferência de dados.
  • Uma onComplete() chamada indica a conclusão bem-sucedida da transferência de dados.

Ao contrário do Java Streams, os fluxos reativos tratam os erros como eventos de primeira classe. Os fluxos reativos têm um canal dedicado para que a fonte comunique quaisquer erros ao assinante. Além disso, os fluxos reativos permitem que o assinante negocie a taxa de transferência de dados para transformar esses fluxos em um modelo push-pull.

A especificação Reative Streams fornece um padrão para como a transferência de dados deve ocorrer. Em um alto nível, a especificação define as quatro interfaces a seguir e especifica regras sobre como essas interfaces devem ser implementadas.

  • O Publisher é a fonte de um fluxo de dados.
  • O assinante é o consumidor de um fluxo de dados.
  • A subscrição gere o estado da transferência de dados entre um editor e um subscritor.
  • O processador é um editor e um assinante.

Existem algumas bibliotecas Java bem conhecidas que fornecem implementações dessa especificação, como RxJava, Akka Streams, Vert.x e Project Reator.

O SDK do Azure para Java adotou o Project Reator para oferecer suas APIs assíncronas. O principal fator que impulsionou essa decisão foi fornecer uma integração suave com o Spring Webflux, que também usa o Project Reator. Outro fator que contribuiu para escolher o Project Reator em vez do RxJava foi que o Project Reator usa Java 8, mas o RxJava, na época, ainda estava no Java 7. O Project Reator também oferece um rico conjunto de operadores que podem ser compostos e permitem que você escreva código declarativo para a construção de pipelines de processamento de dados. Outra coisa legal sobre o Project Reator é que ele tem adaptadores para converter tipos de Project Reator para outros tipos de implementação populares.

Comparando APIs de operações síncronas e assíncronas

Discutimos os clientes síncronos e as opções para clientes assíncronos. A tabela abaixo resume a aparência das APIs projetadas usando essas opções:

Tipo de API Sem valor Valor único Vários valores
Java padrão - APIs síncronas void T Iterable<T>
Java padrão - APIs assíncronas CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Interfaces de fluxos reativos Publisher<Void> Publisher<T> Publisher<T>
Projeto de Implementação de Reatores de Correntes Reativas Mono<Void> Mono<T> Flux<T>

Por uma questão de completude, vale a pena mencionar que o Java 9 introduziu a classe Flow que inclui as quatro interfaces de fluxos reativos. No entanto, essa classe não inclui nenhuma implementação.

Usar APIs assíncronas no SDK do Azure para Java

A especificação de fluxos reativos não diferencia entre tipos de editores. Na especificação de fluxos reativos, os editores simplesmente produzem zero ou mais elementos de dados. Em muitos casos, há uma distinção útil entre um editor que produz no máximo um elemento de dados e um que produz zero ou mais. Em APIs baseadas em nuvem, essa distinção indica se uma solicitação retorna uma resposta de valor único ou uma coleção. O Project Reator fornece dois tipos para fazer esta distinção - Mono e Flux. Uma API que retorna um conterá uma resposta que tem no máximo um valor, e uma API que retorna um MonoFlux conterá uma resposta que tem zero ou mais valores.

Por exemplo, suponha que você use um ConfigurationAsyncClient para recuperar uma configuração armazenada usando o serviço de Configuração de Aplicativo do Azure. (Para obter mais informações, consulte O que é a Configuração de Aplicativo do Azure?.)

Se você criar um e chamar getConfigurationSetting() o cliente, ele retornará um , que indica que a resposta contém um ConfigurationAsyncClientMonoúnico valor. No entanto, chamar esse método sozinho não faz nada. O cliente ainda não fez uma solicitação ao serviço de Configuração de Aplicativo do Azure. Neste estágio, o Mono<ConfigurationSetting> retornado por esta API é apenas uma "montagem" do pipeline de processamento de dados. Isso significa que a configuração necessária para consumir os dados está completa. Para realmente acionar a transferência de dados (ou seja, para fazer a solicitação para o serviço e obter a resposta), você deve se inscrever para o devolvido Mono. Então, ao lidar com esses fluxos reativos, você deve se lembrar de ligar subscribe() porque nada acontece até que você faça isso.

O exemplo a seguir mostra como assinar e Mono imprimir o valor de configuração no console.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Observe que, depois de chamar getConfigurationSetting() o cliente, o código de exemplo assina o resultado e fornece três lambdas separados. O primeiro lambda consome dados recebidos do serviço, que são acionados após uma resposta bem-sucedida. O segundo lambda é acionado se houver um erro ao recuperar a configuração. O terceiro lambda é invocado quando o fluxo de dados é concluído, o que significa que não são esperados mais elementos de dados desse fluxo.

Nota

Tal como acontece com toda a programação assíncrona, depois de a subscrição ser criada, a execução prossegue como habitualmente. Se não houver nada para manter o programa ativo e em execução, ele pode ser encerrado antes que a operação assíncrona seja concluída. O thread principal chamado não esperará até que subscribe() você faça a chamada de rede para a Configuração do Aplicativo do Azure e receba uma resposta. Em sistemas de produção, você pode continuar a processar outra coisa, mas neste exemplo você pode adicionar um pequeno atraso chamando Thread.sleep() ou usando um CountDownLatch para dar à operação assíncrona a chance de ser concluída.

Como mostrado no exemplo a seguir, as APIs que retornam um também seguem um Flux padrão semelhante. A diferença é que o primeiro retorno de chamada fornecido ao subscribe() método é chamado várias vezes para cada elemento de dados na resposta. O erro ou os retornos de chamada de conclusão são chamados exatamente uma vez e são considerados como sinais de terminal. Nenhum outro retorno de chamada é invocado se qualquer um desses sinais for recebido do editor.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Contrapressão

O que acontece quando a fonte está produzindo os dados a um ritmo mais rápido do que o assinante pode lidar? O assinante pode ficar sobrecarregado com dados, o que pode levar a erros de falta de memória. O assinante precisa de uma maneira de se comunicar com o editor para diminuir a velocidade quando ele não consegue acompanhar. Por padrão, quando você chama subscribe() um como mostrado no exemplo acima, o assinante está solicitando um Flux fluxo ilimitado de dados, indicando ao editor para enviar os dados o mais rápido possível. Esse comportamento nem sempre é desejável, e o assinante pode ter que controlar a taxa de publicação através de "backpressure". Backpressure permite que o assinante assuma o controle do fluxo de elementos de dados. Um assinante solicitará um número limitado de elementos de dados que pode manipular. Depois de concluir o processamento destes elementos, o subscritor pode solicitar mais. Usando backpressure, você pode transformar um modelo push-para transferência de dados em um modelo push-pull.

O exemplo a seguir mostra como você pode controlar a taxa na qual os eventos são recebidos pelo consumidor de Hubs de Eventos:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Quando o assinante "se conecta" pela primeira vez ao editor, o editor entrega ao assinante uma Subscription instância, que gerencia o estado da transferência de dados. Este Subscription é o meio através do qual o assinante pode aplicar backpressure ligando request() para especificar quantos mais elementos de dados ele pode manipular.

Se o assinante solicitar mais de um elemento de dados cada vez que ligar onNext(), request(10) por exemplo, o editor enviará os próximos 10 elementos imediatamente se estiverem disponíveis ou quando estiverem disponíveis. Esses elementos se acumulam em um buffer na extremidade do assinante e, como cada onNext() chamada solicitará mais 10, a lista de pendências continua crescendo até que o editor não tenha mais elementos de dados para enviar ou o buffer do assinante estoure, resultando em erros de falta de memória.

Cancelar uma subscrição

Uma subscrição gere o estado da transferência de dados entre um editor e um subscritor. A assinatura fica ativa até que o editor tenha concluído a transferência de todos os dados para o assinante ou o assinante não esteja mais interessado em receber dados. Há algumas maneiras de cancelar uma assinatura, conforme mostrado abaixo.

O exemplo a seguir cancela a assinatura descartando o assinante:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

O exemplo a seguir cancela a assinatura chamando o cancel() método em Subscription:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Conclusão

Os threads são recursos caros que você não deve desperdiçar esperando respostas de chamadas de serviço remoto. À medida que a adoção de arquiteturas de microsserviços aumenta, a necessidade de escalar e usar recursos de forma eficiente torna-se vital. As APIs assíncronas são favoráveis quando há operações ligadas à rede. O SDK do Azure para Java oferece um conjunto avançado de APIs para operações assíncronas para ajudar a maximizar os recursos do sistema. Nós encorajamos você a experimentar nossos clientes assíncronos.

Para obter mais informações sobre os operadores que melhor se adequam às suas tarefas específicas, consulte Qual operador eu preciso?, no Guia de Referência do Reator 3.

Próximos passos

Agora que você entende melhor os vários conceitos de programação assíncrona, é importante aprender a iterar os resultados. Para obter mais informações sobre as melhores estratégias de iteração e detalhes de como a paginação funciona, consulte Paginação e iteração no SDK do Azure para Java.