Executar operações em massa em dados do Azure Cosmos DB

APLICA-SE A: NoSQL

Este tutorial fornece instruções sobre como executar operações em massa no Azure Cosmos DB Java V4 SDK. Esta versão do SDK vem com a biblioteca de executor em massa interno. Se você estiver usando uma versão mais antiga do SDK do Java, é recomendável migrar para a versão mais recente. O Azure Cosmos DB Java V4 SDK é a solução atual recomendada para o suporte em massa em Java.

Atualmente, a biblioteca do executor em massa tem suporte apenas das contas do Azure Cosmos DB for NoSQL e da API para Gremlin. Para saber mais sobre como usar a biblioteca do .NET de executor em massa com a API para Gremlin, consulte Executar operações em massa no Gremlin do Azure Cosmos DB.

Pré-requisitos

Clonar o aplicativo de exemplo

Agora vamos mudar para trabalhar com o código baixando um repositório de exemplos genéricos para o Java V4 SDK para o Azure Cosmos DB do GitHub. Esses aplicativos de exemplo executam operações CRUD e outras operações comuns no Azure Cosmos DB. Para clonar o repositório, abra um prompt de comando, navegue até o diretório onde você deseja copiar o aplicativo e execute o seguinte comando:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

O repositório clonado contém uma amostra SampleBulkQuickStartAsync.java na pasta /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async. O aplicativo gera documentos e executa operações para criar, executar upsert, substituir e excluir itens em massa no Azure Cosmos DB. Nas próximas seções, examinaremos o código no aplicativo de exemplo.

Execução em massa no Azure Cosmos DB

  1. As cadeias de caracteres de conexão do Azure Cosmos DB são lidas como argumentos e atribuídas a variáveis definidas no arquivo /examples/common/AccountSettings.java. Essas variáveis de ambiente devem ser definidas
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Para executar um exemplo em massa, especifique sua Classe principal:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. O objeto CosmosAsyncClient é inicializado usando as instruções a seguir:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. A amostra cria um banco de dados e um contêiner assíncronos. Em seguida, ela cria vários documentos nos quais as operações em massa serão executadas. Ela adiciona esses documentos a um objeto Flux<Family> de fluxo reativo:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. A amostra contém métodos para criação em massa, execução de upsert, substituição e exclusão. Em cada método, mapeamos os documentos de famílias no fluxo BulkWriter Flux<Family> para várias chamadas de método em CosmosBulkOperations. Essas operações são adicionadas a outro objeto de fluxo reativo Flux<CosmosItemOperation>. O fluxo é então passado para o método executeBulkOperations da container assíncrona que criamos no início e as operações são executadas em massa. Veja o método de criação em massa abaixo como exemplo:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Também há uma classe BulkWriter.java no mesmo diretório que o aplicativo de exemplo. Essa classe demonstra como tratar erros de limitação de taxa (429) e tempo limite (408) que podem ocorrer durante a execução em massa e tentar novamente essas operações com eficiência. Ela é implementada nos métodos abaixo, mostrando também como implementar o controle de taxa de transferência local e global.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Além disso, há métodos de criação em massa no exemplo que ilustram como adicionar processamento de resposta e definir opções de execução:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

Dicas de desempenho

Considere os seguintes pontos para melhor desempenho ao usar a biblioteca bulk executor:

  • Para ter o melhor desempenho, execute o aplicativo de uma VM do Azure na mesma região da sua região de gravação da conta do Azure Cosmos DB.

  • Para alcançar maior taxa de transferência:

    • Defina o tamanho do heap da JVM para um número grande o suficiente para evitar qualquer problema de memória na manipulação de grandes números de documentos. Tamanho do heap sugerido: máx (3GB, 3 * tamanho de [todos os documentos passados para API de importação em massa em um lote]).
    • Há um tempo de pré-processamento, devido ao qual você obterá maior taxa de transferência ao executar operações em massa com um grande número de documentos. Portanto, se você deseja importar 10.000.000 documentos, executar a importação em massa 10 vezes em 10 lotes de documentos, cada um de tamanho 1.000.000, é preferível do que a execução da importação em massa 100 vezes em 100 lotes de documentos, cada um de tamanho 100.000 documentos.
  • É recomendado criar uma instância de um único objeto CosmosAsyncClient para o aplicativo inteiro em uma única máquina virtual que corresponda a um contêiner do Azure Cosmos DB específico.

  • Como uma execução de API de operação em massa única consome uma grande parte de ES de CPU e de rede do computador cliente. Isso acontece por geração de várias tarefas internamente, evite a geração de várias tarefas simultâneas no processo de aplicativo executando chamadas de API de operação em massa. Se uma única chamada à API de operação em massa em execução em uma única máquina virtual não puder consumir a taxa de transferência inteira do contêiner (se a taxa de transferência > do contêiner for de mais de 1 milhão de RU/s), prefira criar máquinas virtuais separadas para executar as chamadas à API de operação em massa simultaneamente.

Próximas etapas