Azure Cosmos DB'de toplu işlemler gerçekleştirmek için toplu yürütücü .NET kitaplığını kullanma

UYGULANANLAR: NoSQL

Not

Bu makalede açıklanan toplu yürütücü kitaplığı, .NET SDK 2.x sürümünü kullanan uygulamalar için korunur. Yeni uygulamalar için. .NET SDK sürüm 3.x ile doğrudan kullanılabilen toplu desteği kullanabilirsiniz ve herhangi bir dış kitaplık gerektirmez.

Şu anda toplu yürütücü kitaplığını kullanıyorsanız ve daha yeni SDK'da toplu desteğe geçmeyi planlıyorsanız, uygulamanızı geçirmek için Geçiş kılavuzundaki adımları kullanın.

Bu öğreticide, belgeleri bir Azure Cosmos DB kapsayıcısına içeri aktarmak ve güncelleştirmek için toplu yürütücü .NET kitaplığını kullanma yönergeleri sağlanır. Toplu yürütücü kitaplığı ve bunun büyük aktarım hızı ve depolamayı kullanmanıza nasıl yardımcı olduğu hakkında bilgi edinmek için bkz. Azure Cosmos DB toplu yürütücü kitaplığına genel bakış. Bu öğreticide, rastgele oluşturulan belgeleri bir Azure Cosmos DB kapsayıcısına toplu olarak içeri aktaran örnek bir .NET uygulaması görürsünüz. Verileri içeri aktardıktan sonra kitaplık, belirli belge alanlarında gerçekleştirilecek işlemler olarak düzeltme ekleri belirterek içeri aktarılan verileri toplu olarak nasıl güncelleştirebileceğinizi gösterir.

Şu anda toplu yürütücü kitaplığı yalnızca NoSQL için Azure Cosmos DB ve Gremlin hesapları için API tarafından desteklenmektedir. Bu makalede, NoSQL hesapları için API ile toplu yürütücü .NET kitaplığının nasıl kullanılacağı açıklanmaktadır. Toplu yürütücü .NET kitaplığını Gremlin hesapları için API ile kullanmayı öğrenmek için bkz . Toplu yürütücü kitaplığı kullanarak Gremlin için Azure Cosmos DB'de verileri toplu olarak alma.

Önkoşullar

Örnek uygulamayı kopyalama

Şimdi GitHub'dan örnek bir .NET uygulaması indirerek kodla çalışmaya geçelim. Bu uygulama, Azure Cosmos DB hesabınızda depolanan veriler üzerinde toplu işlemler gerçekleştirir. Uygulamayı kopyalamak için bir komut istemi açın, kopyalamak istediğiniz dizine gidin ve aşağıdaki komutu çalıştırın:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

Kopyalanan depo, BulkImportSample ve BulkUpdateSample adlı iki örnek içerir. Örnek uygulamalardan birini açabilir, App.config dosyasındaki bağlantı dizesi Azure Cosmos DB hesabınızın bağlantı dizesi güncelleştirebilir, çözümü derleyebilir ve çalıştırabilirsiniz.

BulkImportSample uygulaması rastgele belgeler oluşturur ve bunları Azure Cosmos DB hesabınıza toplu olarak içeri aktarır. BulkUpdateSample uygulaması, belirli belge alanlarında gerçekleştirilecek işlemler olarak düzeltme ekleri belirterek içeri aktarılan belgeleri toplu olarak güncelleştirir. Sonraki bölümlerde, bu örnek uygulamaların her birinde kodu gözden geçireceksiniz.

Azure Cosmos DB hesabına verileri toplu içeri aktarma

  1. BulkImportSample klasörüne gidin ve BulkImportSample.sln dosyasını açın.

  2. Azure Cosmos DB'nin bağlantı dizesi aşağıdaki kodda gösterildiği gibi App.config dosyasından alınır:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    Toplu içeri aktarıcı yeni bir veritabanı ve veritabanı adı, kapsayıcı adı ve App.config dosyasında belirtilen aktarım hızı değerleriyle bir kapsayıcı oluşturur.

  3. Ardından, DocumentClient nesnesi Doğrudan TCP bağlantı moduyla başlatılır:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. BulkExecutor nesnesi, bekleme süresi ve kısıtlanmış istekler için yüksek bir yeniden deneme değeriyle başlatılır. Ardından, kullanım ömrü boyunca tıkanıklık denetimini BulkExecutor'a geçirmek için 0 olarak ayarlanır.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. Uygulama BulkImportAsync API'sini çağırır. .NET kitaplığı toplu içeri aktarma API'sinin iki aşırı yüklemesini sağlar: biri serileştirilmiş JSON belgelerinin listesini, diğeri de seri durumdan çıkarılmış POCO belgelerinin listesini kabul eder. Bu aşırı yüklenmiş yöntemlerin tanımları hakkında daha fazla bilgi edinmek için API belgelerine bakın.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    BulkImportAsync yöntemi aşağıdaki parametreleri kabul eder:

    Parametre Açıklama
    enableUpsert Belgelerde upsert işlemlerini etkinleştirmek için bir bayrak. Verilen kimliğe sahip bir belge zaten varsa, belge güncelleştirilir. Varsayılan olarak false olarak ayarlanır.
    disableAutomaticIdGeneration Kimliğin otomatik olarak oluşturulmasını devre dışı bırakmak için bir bayrak. Varsayılan olarak true olarak ayarlanır.
    maxConcurrencyPerPartitionKeyRange Bölüm anahtarı aralığı başına en yüksek eşzamanlılık derecesi. null olarak ayarlanması kitaplığın varsayılan 20 değerini kullanmasına neden olur.
    maxInMemorySortingBatchSize Belge numaralandırıcısından çekilen ve her aşamada API çağrısına geçirilen en fazla belge sayısı. Toplu içeri aktarmadan önce gerçekleşen bellek içi sıralama aşaması için. Bu parametrenin null olarak ayarlanması kitaplığın varsayılan en düşük değeri (documents.count, 1000000) kullanmasına neden olur.
    cancellationToken Toplu içeri aktarma işleminden düzgün bir şekilde çıkmak için iptal belirteci.

Toplu içeri aktarma yanıtı nesne tanımı
Toplu içeri aktarma API çağrısının sonucu aşağıdaki öznitelikleri içerir:

Parametre Açıklama
NumberOfDocumentsImported (uzun) Toplu içeri aktarma API çağrısına sağlanan toplam belgelerden başarıyla içeri aktarılan toplam belge sayısı.
TotalRequestUnitsConsumed (çift) Toplu içeri aktarma API çağrısı tarafından kullanılan toplam istek birimi (RU).
TotalTimeTaken (TimeSpan) Yürütmeyi tamamlamak için toplu içeri aktarma API çağrısı tarafından geçen toplam süre.
BadInputDocuments (Liste<nesnesi>) Toplu içeri aktarma API çağrısında başarıyla içeri aktarılamamış hatalı biçimli belgelerin listesi. Döndürülen belgeleri düzeltin ve içeri aktarmayı yeniden deneyin. Hatalı biçimlendirilmiş belgeler, kimlik değeri dize olmayan belgeleri içerir (null veya başka bir veri türü geçersiz olarak kabul edilir).

Azure Cosmos DB hesabınızdaki verileri toplu güncelleştirme

BulkUpdateAsync API'sini kullanarak mevcut belgeleri güncelleştirebilirsiniz. Bu örnekte, alanı yeni bir değere ayarlar Name ve alanı varolan belgelerden kaldırırsınız Description . Desteklenen güncelleştirme işlemlerinin tamamı için API belgelerine bakın.

  1. BulkUpdateSample klasörüne gidin ve BulkUpdateSample.sln dosyasını açın.

  2. İlgili alan güncelleştirme işlemleriyle birlikte güncelleştirme öğelerini tanımlayın. Bu örnekte, alanı güncelleştirmek Name için SetUpdateOperation ve alanı tüm belgelerden kaldırmak Description için UnsetUpdateOperation kullanırsınız. Ayrıca, belge alanını belirli bir değere göre artırma, belirli değerleri dizi alanına gönderme veya dizi alanından belirli bir değeri kaldırma gibi başka işlemler de gerçekleştirebilirsiniz. Toplu güncelleştirme API'sinin sağladığı farklı yöntemler hakkında bilgi edinmek için API belgelerine bakın.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. Uygulama BulkUpdateAsync API'sini çağırır. BulkUpdateAsync yönteminin tanımı hakkında bilgi edinmek için API belgelerine bakın.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    BulkUpdateAsync yöntemi aşağıdaki parametreleri kabul eder:

    Parametre Açıklama
    maxConcurrencyPerPartitionKeyRange Bölüm anahtarı aralığı başına en yüksek eşzamanlılık derecesi. Bu parametre null olarak ayarlanırsa, kitaplık varsayılan değeri (20) kullanır.
    maxInMemorySortingBatchSize Her aşamada API çağrısına geçirilen güncelleştirme öğeleri numaralandırıcısından çekilen en fazla güncelleştirme öğesi sayısı. Toplu güncelleştirmeden önce gerçekleşen bellek içi sıralama aşaması için. Bu parametrenin null olarak ayarlanması kitaplığın varsayılan en düşük değeri (updateItems.count, 1000000) kullanmasına neden olur.
    cancellationToken Toplu güncelleştirme işleminden düzgün bir şekilde çıkmak için iptal belirteci.

Toplu güncelleştirme yanıtı nesne tanımı
Toplu güncelleştirme API çağrısının sonucu aşağıdaki öznitelikleri içerir:

Parametre Açıklama
NumberOfDocumentsUpdated (uzun) Toplu güncelleştirme API'si çağrısına sağlanan toplam belge sayısının dışında başarıyla güncelleştirilen belge sayısı.
TotalRequestUnitsConsumed (çift) Toplu güncelleştirme API çağrısı tarafından kullanılan toplam istek birimi (RU) .
TotalTimeTaken (TimeSpan) Toplu güncelleştirme API'sinin yürütmeyi tamamlamak için aldığı toplam süre.

Performans ipuçları

Toplu yürütücü kitaplığını kullanırken daha iyi performans için aşağıdaki noktaları göz önünde bulundurun:

  • En iyi performans için uygulamanızı Azure Cosmos DB hesabınızın yazma bölgesiyle aynı bölgede yer alan bir Azure sanal makinesinden çalıştırın.

  • Belirli bir Azure Cosmos DB kapsayıcısına karşılık gelen tek bir sanal makinede uygulamanın tamamı için tek bir BulkExecutor nesnesi örneği oluşturmanız önerilir.

  • Tek bir toplu işlem API'sinin yürütülmesi, birden çok görevi dahili olarak oluştururken istemci makinenin CPU ve ağ GÇ'sinin büyük bir öbeklerini kullanır. Toplu işlem API çağrılarını yürüten uygulama işleminizde birden çok eşzamanlı görev oluşturmaktan kaçının. Tek bir sanal makinede çalışan tek bir toplu işlem API'si çağrısı kapsayıcının aktarım hızının tamamını (kapsayıcınızın aktarım hızı > 1 milyon RU/sn ise) tüketemiyorsa, toplu işlem API çağrılarını eşzamanlı olarak yürütmek için ayrı sanal makineler oluşturmak tercih edilir.

  • Hedef Azure Cosmos DB kapsayıcısının InitializeAsync() bölüm eşlemesini getirmek için bir BulkExecutor nesnesi örneği oluşturarak yönteminin çağrıldığından emin olun.

  • Uygulamanızın App.Config dosyasında daha iyi performans için gcServer'ın etkinleştirildiğinden emin olun

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • Kitaplık, bir günlük dosyasına veya konsolda toplanabilir izlemeler yayar. Her ikisini de etkinleştirmek için uygulamanızın App.Config dosyasına aşağıdaki kodu ekleyin:

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

Sonraki adımlar