Executar operações em massa em dados do Azure Cosmos DB
APLICA-SE A: NoSQL
Este tutorial fornece instruções sobre como executar operações em massa no Azure Cosmos DB Java V4 SDK. Esta versão do SDK vem com a biblioteca de executor em massa interno. Se você estiver usando uma versão mais antiga do SDK do Java, é recomendável migrar para a versão mais recente. O Azure Cosmos DB Java V4 SDK é a solução atual recomendada para o suporte em massa em Java.
Atualmente, a biblioteca do executor em massa tem suporte apenas das contas do Azure Cosmos DB for NoSQL e da API para Gremlin. Para saber mais sobre como usar a biblioteca do .NET de executor em massa com a API para Gremlin, consulte Executar operações em massa no Gremlin do Azure Cosmos DB.
Pré-requisitos
Se você não tiver uma assinatura do Azure, crie uma conta gratuita antes de começar.
Você pode experimentar o Azure Cosmos DB gratuitamente sem uma assinatura do Azure, sem nenhuma e sem compromisso. Ou você pode usar o Emulador do Azure Cosmos DB com o ponto de extremidade
https://localhost:8081
. A Chave Primária é fornecida nas Solicitações de autenticação.JDK (Java Development Kit) 1.8+
No Ubuntu, execute
apt-get install default-jdk
para instalar o JDK.Defina a variável de ambiente JAVA_HOME para apontar para a pasta onde o JDK está instalado.
Baixar e instalar um armazenamento binário Maven
- No Ubuntu, você pode executar
apt-get install maven
para instalar o Maven.
- No Ubuntu, você pode executar
Crie uma conta do Azure Cosmos DB for NoSQL usando as etapas descritas na seção Criar conta de banco de dados do artigo de início rápido do Java.
Clonar o aplicativo de exemplo
Agora vamos mudar para trabalhar com o código baixando um repositório de exemplos genéricos para o Java V4 SDK para o Azure Cosmos DB do GitHub. Esses aplicativos de exemplo executam operações CRUD e outras operações comuns no Azure Cosmos DB. Para clonar o repositório, abra um prompt de comando, navegue até o diretório onde você deseja copiar o aplicativo e execute o seguinte comando:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples
O repositório clonado contém uma amostra SampleBulkQuickStartAsync.java
na pasta /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async
. O aplicativo gera documentos e executa operações para criar, executar upsert, substituir e excluir itens em massa no Azure Cosmos DB. Nas próximas seções, examinaremos o código no aplicativo de exemplo.
Execução em massa no Azure Cosmos DB
- As cadeias de caracteres de conexão do Azure Cosmos DB são lidas como argumentos e atribuídas a variáveis definidas no arquivo /
examples/common/AccountSettings.java
. Essas variáveis de ambiente devem ser definidas
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key
Para executar um exemplo em massa, especifique sua Classe principal:
com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
O objeto
CosmosAsyncClient
é inicializado usando as instruções a seguir:client = new CosmosClientBuilder() .endpoint(AccountSettings.HOST) .key(AccountSettings.MASTER_KEY) .preferredRegions(preferredRegions) .contentResponseOnWriteEnabled(true) .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
A amostra cria um banco de dados e um contêiner assíncronos. Em seguida, ela cria vários documentos nos quais as operações em massa serão executadas. Ela adiciona esses documentos a um objeto
Flux<Family>
de fluxo reativo:Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); Family johnsonFamilyItem = Families.getJohnsonFamilyItem(); Family smithFamilyItem = Families.getSmithFamilyItem(); // Setup family items to create Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
A amostra contém métodos para criação em massa, execução de upsert, substituição e exclusão. Em cada método, mapeamos os documentos de famílias no fluxo BulkWriter
Flux<Family>
para várias chamadas de método emCosmosBulkOperations
. Essas operações são adicionadas a outro objeto de fluxo reativoFlux<CosmosItemOperation>
. O fluxo é então passado para o métodoexecuteBulkOperations
dacontainer
assíncrona que criamos no início e as operações são executadas em massa. Veja o método de criação em massa abaixo como exemplo:private void bulkCreateItems(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).blockLast(); }
Também há uma classe
BulkWriter.java
no mesmo diretório que o aplicativo de exemplo. Essa classe demonstra como tratar erros de limitação de taxa (429) e tempo limite (408) que podem ocorrer durante a execução em massa e tentar novamente essas operações com eficiência. Ela é implementada nos métodos abaixo, mostrando também como implementar o controle de taxa de transferência local e global.private void bulkUpsertItemsWithBulkWriterAbstraction() { Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() { ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group1") .setTargetThroughput(200) .build(); container.enableLocalThroughputControlGroup(groupConfig); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() { String controlContainerId = "throughputControlContainer"; CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId); database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block(); ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group-" + UUID.randomUUID()) .setTargetThroughput(200) .build(); GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId) .setControlItemRenewInterval(Duration.ofSeconds(5)) .setControlItemExpireInterval(Duration.ofSeconds(20)) .build(); container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig); CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); requestOptions.setThroughputControlGroupName(groupConfig.getGroupName()); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); }
Além disso, há métodos de criação em massa no exemplo que ilustram como adicionar processamento de resposta e definir opções de execução:
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> { CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation(); if (cosmosBulkOperationResponse.getException() != null) { logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); } else if (cosmosBulkItemResponse == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) { logger.error( "The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " + "successfully with " + "a" + " {} response code.", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName(), cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a"); } else { logger.info( "Item ID: [{}] Item PartitionKey Value: [{}]", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName()); logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode()); logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge()); } if (cosmosBulkItemResponse == null) { return Mono.error(new IllegalStateException("No response retrieved.")); } else { return Mono.just(cosmosBulkItemResponse); } }).blockLast(); } private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) { CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); // The default value for maxMicroBatchConcurrency is 1. // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage. // // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not. // When the RU has already been under saturation, increasing the concurrency will not help the situation, // rather it may cause more 429 and request timeout. bulkExecutionOptions.setMaxMicroBatchConcurrency(2); Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast(); }
Dicas de desempenho
Considere os seguintes pontos para melhor desempenho ao usar a biblioteca bulk executor:
Para ter o melhor desempenho, execute o aplicativo de uma VM do Azure na mesma região da sua região de gravação da conta do Azure Cosmos DB.
Para alcançar maior taxa de transferência:
- Defina o tamanho do heap da JVM para um número grande o suficiente para evitar qualquer problema de memória na manipulação de grandes números de documentos. Tamanho do heap sugerido: máx (3GB, 3 * tamanho de [todos os documentos passados para API de importação em massa em um lote]).
- Há um tempo de pré-processamento, devido ao qual você obterá maior taxa de transferência ao executar operações em massa com um grande número de documentos. Portanto, se você deseja importar 10.000.000 documentos, executar a importação em massa 10 vezes em 10 lotes de documentos, cada um de tamanho 1.000.000, é preferível do que a execução da importação em massa 100 vezes em 100 lotes de documentos, cada um de tamanho 100.000 documentos.
É recomendado criar uma instância de um único objeto CosmosAsyncClient para o aplicativo inteiro em uma única máquina virtual que corresponda a um contêiner do Azure Cosmos DB específico.
Como uma execução de API de operação em massa única consome uma grande parte de ES de CPU e de rede do computador cliente. Isso acontece por geração de várias tarefas internamente, evite a geração de várias tarefas simultâneas no processo de aplicativo executando chamadas de API de operação em massa. Se uma única chamada à API de operação em massa em execução em uma única máquina virtual não puder consumir a taxa de transferência inteira do contêiner (se a taxa de transferência > do contêiner for de mais de 1 milhão de RU/s), prefira criar máquinas virtuais separadas para executar as chamadas à API de operação em massa simultaneamente.
Próximas etapas
- Para obter uma visão geral da funcionalidade de executor em massa, confira Visão geral do executor em massa.