Azure Cosmos DB verilerinde toplu işlemler gerçekleştirme

UYGULANANLAR: NoSQL

Bu öğreticide Azure Cosmos DB Java V4 SDK'sında toplu işlemler gerçekleştirme yönergeleri sağlanır. SDK'nın bu sürümü, yerleşik toplu yürütücü kitaplığıyla birlikte gelir. Java SDK'sının eski bir sürümünü kullanıyorsanız en son sürüme geçmeniz önerilir. Azure Cosmos DB Java V4 SDK'sı, Java toplu desteği için önerilen geçerli çözümdür.

Şu anda toplu yürütücü kitaplığı yalnızca NoSQL için Azure Cosmos DB ve Gremlin hesapları için API tarafından desteklenmektedir. Toplu yürütücü .NET kitaplığını Gremlin için API ile kullanma hakkında bilgi edinmek için bkz . Gremlin için Azure Cosmos DB'de toplu işlemler gerçekleştirme.

Önkoşullar

Örnek uygulamayı kopyalama

Şimdi GitHub'dan Azure Cosmos DB için Java V4 SDK'sı için genel bir örnek deposu indirerek kodla çalışmaya geçelim. Bu örnek uygulamalar Azure Cosmos DB'de CRUD işlemleri ve diğer yaygın işlemleri gerçekleştirir. Depoyu kopyalamak için bir komut istemi açın, uygulamayı kopyalamak istediğiniz dizine gidin ve aşağıdaki komutu çalıştırın:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

Kopyalanan depo klasörde bir örnek SampleBulkQuickStartAsync.java /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async içerir. Uygulama, Azure Cosmos DB'de öğeleri toplu olarak oluşturmak, eklemek, değiştirmek ve silmek için belgeler oluşturur ve işlemleri yürütür. Sonraki bölümlerde örnek uygulamadaki kodu gözden geçireceğiz.

Azure Cosmos DB'de toplu yürütme

  1. Azure Cosmos DB'nin bağlantı dizesi bağımsız değişken olarak okunur ve /examples/common/AccountSettings.javadosyasında tanımlanan değişkenlere atanır. Bu ortam değişkenleri ayarlanmalıdır
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Toplu örneği çalıştırmak için Main Sınıfını belirtin:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. CosmosAsyncClient Nesnesi aşağıdaki deyimler kullanılarak başlatılır:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. Örnek, zaman uyumsuz bir veritabanı ve kapsayıcı oluşturur. Daha sonra toplu işlemlerin yürütüleceği birden çok belge oluşturur. Bu belgeleri reaktif akış Flux<Family> nesnesine ekler:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. Örnek toplu oluşturma, upsert, değiştirme ve silme yöntemlerini içerir. Her yöntemde BulkWriter Flux<Family> akışındaki familys belgelerini içinde CosmosBulkOperationsbirden çok yöntem çağrısıyla eşleriz. Bu işlemler başka bir reaktif akış nesnesine Flux<CosmosItemOperation>eklenir. Akış daha sonra başlangıçta oluşturduğumuz zaman uyumsuz container yöntemine geçirilir executeBulkOperations ve işlemler toplu olarak yürütülür. Örnek olarak aşağıdaki toplu oluşturma yöntemine bakın:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Örnek uygulamayla aynı dizinde de bir sınıf BulkWriter.java vardır. Bu sınıf, toplu yürütme sırasında oluşabilecek hız sınırlama (429) ve zaman aşımı (408) hatalarının nasıl işleneceğini ve bu işlemlerin etkili bir şekilde yeniden denendiğini gösterir. Yerel ve genel aktarım hızı denetiminin nasıl uygulandığını da gösteren aşağıdaki yöntemlerde uygulanır.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Ayrıca, örnekte yanıt işleme ekleme ve yürütme seçeneklerini ayarlama işlemlerini gösteren toplu oluşturma yöntemleri vardır:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

Performans ipuçları

Toplu yürütücü kitaplığını kullanırken daha iyi performans için aşağıdaki noktaları göz önünde bulundurun:

  • En iyi performans için uygulamanızı Azure Cosmos DB hesabınızın yazma bölgesiyle aynı bölgedeki bir Azure VM'sinden çalıştırın.

  • Daha yüksek aktarım hızı elde etmek için:

    • Çok sayıda belgeyi işlerken herhangi bir bellek sorununu önlemek için JVM'nin yığın boyutunu yeterince büyük bir sayıya ayarlayın. Önerilen yığın boyutu: max(3 GB, 3 * sizeof(tek bir toplu işlemde toplu içeri aktarma API'sine geçirilen tüm belgeler)).
    • Çok sayıda belgeyle toplu işlemler gerçekleştirirken daha yüksek aktarım hızı elde ettiğiniz için ön işleme süresi vardır. Bu nedenle, 10.000.000 belgeyi içeri aktarmak istiyorsanız, her biri 1.000.000 boyutunda 10 toplu belgede 10 kez toplu içeri aktarma çalıştırmak, her biri 100.000 boyutunda belge üzerinde toplu içeri aktarma çalıştırmaktan daha tercih edilir.
  • Belirli bir Azure Cosmos DB kapsayıcısına karşılık gelen tek bir sanal makine içinde uygulamanın tamamı için tek bir CosmosAsyncClient nesnesi örneği oluşturmanız önerilir.

  • Tek bir toplu işlem API'sinin yürütülmesi istemci makinesinin CPU ve ağ GÇ'sinin büyük bir öbeklerini tükettiğinden. Bu durum, dahili olarak birden çok görev ortaya çıkararak gerçekleşir, toplu işlem API çağrılarını yürüten her uygulama işleminizde birden çok eşzamanlı görev oluşturmaktan kaçının. Tek bir sanal makinede çalıştırılan tek bir toplu işlem API'si çağrıları kapsayıcınızın aktarım hızının tamamını tüketemiyorsa (kapsayıcınızın aktarım hızı > 1 milyon RU/sn ise), toplu işlem API çağrılarını eşzamanlı olarak yürütmek için ayrı sanal makineler oluşturmak tercih edilir.

Sonraki adımlar