Azure Cosmos DB verilerinde toplu işlemler gerçekleştirme
UYGULANANLAR: NoSQL
Bu öğreticide Azure Cosmos DB Java V4 SDK'sında toplu işlemler gerçekleştirme yönergeleri sağlanır. SDK'nın bu sürümü, yerleşik toplu yürütücü kitaplığıyla birlikte gelir. Java SDK'sının eski bir sürümünü kullanıyorsanız en son sürüme geçmeniz önerilir. Azure Cosmos DB Java V4 SDK'sı, Java toplu desteği için önerilen geçerli çözümdür.
Şu anda toplu yürütücü kitaplığı yalnızca NoSQL için Azure Cosmos DB ve Gremlin hesapları için API tarafından desteklenmektedir. Toplu yürütücü .NET kitaplığını Gremlin için API ile kullanma hakkında bilgi edinmek için bkz . Gremlin için Azure Cosmos DB'de toplu işlemler gerçekleştirme.
Önkoşullar
Azure aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.
Azure Cosmos DB'yi Azure aboneliği olmadan ücretsiz olarak, ücretsiz olarak ve taahhütlerle deneyebilirsiniz. İsterseniz uç nokta ile Azure Cosmos DB Öykünücüsü'ne de
https://localhost:8081
erişebilirsiniz. Birincil Anahtar, Kimlik doğrulama istekleri bölümünde sağlanır.Java Development Kit (JDK) 1.8+
Ubuntu’da JDK’yi yüklemek için
apt-get install default-jdk
komutunu çalıştırın.JAVA_HOME ortam değişkenini JDK’nin yüklü olduğu klasöre işaret edecek şekilde ayarladığınızdan emin olun.
Bir Maven ikili arşivi indirin ve yükleyin
- Ubuntu’da Maven’i yüklemek için
apt-get install maven
komutunu çalıştırabilirsiniz.
- Ubuntu’da Maven’i yüklemek için
Java hızlı başlangıç makalesinin veritabanı hesabı oluşturma bölümünde açıklanan adımları kullanarak NoSQL için Azure Cosmos DB hesabı oluşturun.
Örnek uygulamayı kopyalama
Şimdi GitHub'dan Azure Cosmos DB için Java V4 SDK'sı için genel bir örnek deposu indirerek kodla çalışmaya geçelim. Bu örnek uygulamalar Azure Cosmos DB'de CRUD işlemleri ve diğer yaygın işlemleri gerçekleştirir. Depoyu kopyalamak için bir komut istemi açın, uygulamayı kopyalamak istediğiniz dizine gidin ve aşağıdaki komutu çalıştırın:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples
Kopyalanan depo klasörde bir örnek SampleBulkQuickStartAsync.java
/azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async
içerir. Uygulama, Azure Cosmos DB'de öğeleri toplu olarak oluşturmak, eklemek, değiştirmek ve silmek için belgeler oluşturur ve işlemleri yürütür. Sonraki bölümlerde örnek uygulamadaki kodu gözden geçireceğiz.
Azure Cosmos DB'de toplu yürütme
- Azure Cosmos DB'nin bağlantı dizesi bağımsız değişken olarak okunur ve /
examples/common/AccountSettings.java
dosyasında tanımlanan değişkenlere atanır. Bu ortam değişkenleri ayarlanmalıdır
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key
Toplu örneği çalıştırmak için Main Sınıfını belirtin:
com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
CosmosAsyncClient
Nesnesi aşağıdaki deyimler kullanılarak başlatılır:client = new CosmosClientBuilder() .endpoint(AccountSettings.HOST) .key(AccountSettings.MASTER_KEY) .preferredRegions(preferredRegions) .contentResponseOnWriteEnabled(true) .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
Örnek, zaman uyumsuz bir veritabanı ve kapsayıcı oluşturur. Daha sonra toplu işlemlerin yürütüleceği birden çok belge oluşturur. Bu belgeleri reaktif akış
Flux<Family>
nesnesine ekler:Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); Family johnsonFamilyItem = Families.getJohnsonFamilyItem(); Family smithFamilyItem = Families.getSmithFamilyItem(); // Setup family items to create Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
Örnek toplu oluşturma, upsert, değiştirme ve silme yöntemlerini içerir. Her yöntemde BulkWriter
Flux<Family>
akışındaki familys belgelerini içindeCosmosBulkOperations
birden çok yöntem çağrısıyla eşleriz. Bu işlemler başka bir reaktif akış nesnesineFlux<CosmosItemOperation>
eklenir. Akış daha sonra başlangıçta oluşturduğumuz zaman uyumsuzcontainer
yöntemine geçirilirexecuteBulkOperations
ve işlemler toplu olarak yürütülür. Örnek olarak aşağıdaki toplu oluşturma yöntemine bakın:private void bulkCreateItems(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).blockLast(); }
Örnek uygulamayla aynı dizinde de bir sınıf
BulkWriter.java
vardır. Bu sınıf, toplu yürütme sırasında oluşabilecek hız sınırlama (429) ve zaman aşımı (408) hatalarının nasıl işleneceğini ve bu işlemlerin etkili bir şekilde yeniden denendiğini gösterir. Yerel ve genel aktarım hızı denetiminin nasıl uygulandığını da gösteren aşağıdaki yöntemlerde uygulanır.private void bulkUpsertItemsWithBulkWriterAbstraction() { Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() { ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group1") .setTargetThroughput(200) .build(); container.enableLocalThroughputControlGroup(groupConfig); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() { String controlContainerId = "throughputControlContainer"; CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId); database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block(); ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group-" + UUID.randomUUID()) .setTargetThroughput(200) .build(); GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId) .setControlItemRenewInterval(Duration.ofSeconds(5)) .setControlItemExpireInterval(Duration.ofSeconds(20)) .build(); container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig); CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); requestOptions.setThroughputControlGroupName(groupConfig.getGroupName()); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); }
Ayrıca, örnekte yanıt işleme ekleme ve yürütme seçeneklerini ayarlama işlemlerini gösteren toplu oluşturma yöntemleri vardır:
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> { CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation(); if (cosmosBulkOperationResponse.getException() != null) { logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); } else if (cosmosBulkItemResponse == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) { logger.error( "The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " + "successfully with " + "a" + " {} response code.", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName(), cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a"); } else { logger.info( "Item ID: [{}] Item PartitionKey Value: [{}]", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName()); logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode()); logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge()); } if (cosmosBulkItemResponse == null) { return Mono.error(new IllegalStateException("No response retrieved.")); } else { return Mono.just(cosmosBulkItemResponse); } }).blockLast(); } private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) { CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); // The default value for maxMicroBatchConcurrency is 1. // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage. // // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not. // When the RU has already been under saturation, increasing the concurrency will not help the situation, // rather it may cause more 429 and request timeout. bulkExecutionOptions.setMaxMicroBatchConcurrency(2); Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast(); }
Performans ipuçları
Toplu yürütücü kitaplığını kullanırken daha iyi performans için aşağıdaki noktaları göz önünde bulundurun:
En iyi performans için uygulamanızı Azure Cosmos DB hesabınızın yazma bölgesiyle aynı bölgedeki bir Azure VM'sinden çalıştırın.
Daha yüksek aktarım hızı elde etmek için:
- Çok sayıda belgeyi işlerken herhangi bir bellek sorununu önlemek için JVM'nin yığın boyutunu yeterince büyük bir sayıya ayarlayın. Önerilen yığın boyutu: max(3 GB, 3 * sizeof(tek bir toplu işlemde toplu içeri aktarma API'sine geçirilen tüm belgeler)).
- Çok sayıda belgeyle toplu işlemler gerçekleştirirken daha yüksek aktarım hızı elde ettiğiniz için ön işleme süresi vardır. Bu nedenle, 10.000.000 belgeyi içeri aktarmak istiyorsanız, her biri 1.000.000 boyutunda 10 toplu belgede 10 kez toplu içeri aktarma çalıştırmak, her biri 100.000 boyutunda belge üzerinde toplu içeri aktarma çalıştırmaktan daha tercih edilir.
Belirli bir Azure Cosmos DB kapsayıcısına karşılık gelen tek bir sanal makine içinde uygulamanın tamamı için tek bir CosmosAsyncClient nesnesi örneği oluşturmanız önerilir.
Tek bir toplu işlem API'sinin yürütülmesi istemci makinesinin CPU ve ağ GÇ'sinin büyük bir öbeklerini tükettiğinden. Bu durum, dahili olarak birden çok görev ortaya çıkararak gerçekleşir, toplu işlem API çağrılarını yürüten her uygulama işleminizde birden çok eşzamanlı görev oluşturmaktan kaçının. Tek bir sanal makinede çalıştırılan tek bir toplu işlem API'si çağrıları kapsayıcınızın aktarım hızının tamamını tüketemiyorsa (kapsayıcınızın aktarım hızı > 1 milyon RU/sn ise), toplu işlem API çağrılarını eşzamanlı olarak yürütmek için ayrı sanal makineler oluşturmak tercih edilir.
Sonraki adımlar
- Toplu yürütücü işlevselliğine genel bakış için bkz . toplu yürütücüye genel bakış.