Bulk Executor library からAzure Cosmos DB .NET V3 SDK のBulkサポートに移行する
適用対象: NoSQL
この記事は、.NET Bulk Executorライブラリを使用する既存のアプリケーションのコードを、最新バージョンの .NET SDK の一括サポート機能に移行するために必要な手順について説明します。
Bulkサポートを有効にする
CosmosClient
インスタンスで、AllowBulkExecution 構成を使用し、Bulkサポートを有効にする:
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
各操作のタスクを作成する
.NET SDKにおけるBulkサポートは、 タスク並列ライブラリ、および同時に発動するグループ化操作を活用することによって作動する。
SDK にはドキュメントまたは操作のリストを入力パラメーターとして受け取る単一のメソッドがありませんが、一括で実行する操作ごとにタスクを作成する必要があります。その後、それらが完了するのを待ちます。
例えば最初の入力が、各項目が以下のスキーマを取るアイテムのリストであるとします:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
一括インポートを実行 (BulkExecutor.BulkImportAsync の使用と同様) する場合は、CreateItemAsync
の同時呼び出しが必要です。 次に例を示します。
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
一括アップデート(BulkExecutor.BulkImportAsyncの使用と同様)を実行したい場合は、項目の値を更新してからReplaceItemAsync
メソッドを同時に起動する必要があります。 次に例を示します。
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
一括削除(BulkExecutor.BulkDeleteAsyncの使用と同様)を実行したい場合は、DeleteItemAsync
を id
および各項目のパーテーションキーと同時に起動する必要があります。 次に例を示します。
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
タスクの結果の状態をCaptureします
以前のコード例では、タスクの同時実行リストを作成し、各タスクで CaptureOperationResponse
メソッドを呼び出しました。 このメソッドは、エラーや リクエストユニットの使用の追跡を行うことによって、BulkExecutorとしての要求した応答スキーマを維持することを可能にする拡張機能です。
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
OperationResponse
が以下のように示されているところは:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
同時に操作を実行する
タスクのリスト全体のスコープを追跡するには、このヘルパー クラスを使用します。
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
ExecuteAsync
メソッドはすべての操作が完了するまで待ちます。ユーザーは次のようにこのメソッドを使用できます。
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
統計をCaptureする
前のコードは、全ての操作が完了するまで待機し、それから必要な統計の計算を行います。 これらの統計は、一括実行プログラムのBulkImportResponseのものと類似しています。
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
BulkOperationResponse
には以下のものが含まれます:
- 一括サポートを通して操作の一覧を処理するのにかかった時間の合計。
- 正常に処理された操作の数。
- 消費された要求ユニットの合計。
- エラーが生じた場合は、例外を含む組み合わせの一覧およびロギングと識別目的のための関連項目が表示されます。
コンフィグレーションを再試行する
一括実行ライブラリには、MaxRetryWaitTimeInSeconds
および RetryOptionsの MaxRetryAttemptsOnThrottledRequests
を、ライブラリにコントロールを委任するために0
に設定するガイダンスがありました。
.NET SDKの一括サポートに関しては、非表示の動作はありません。 再試行オプションをCosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests および CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequestsを通して直接コンフィギュアする事が出来ます。
Note
データの量に基づき、供給された要求ユニットが予想よりはるかに低いとされた場合には、これらの値を高い値に設定することを検討してください。 一括操作は長時間かかりますが、再試行の回数が多いため、より高い可能性で完全に成功します。
パフォーマンスの向上
.NET SDKの他の操作と同様に、stream APIsを使用することでよりパフォーマンスが向上させ、不要なシリアル化を回避することにつながります。
stream APIsの使用は、ご使用のデータの性質がbytesのストリームのものと合致する場合においてのみ可能です(例:ファイルストリーム)。 そのような場合には、CreateItemStreamAsync
またはReplaceItemStreamAsync
、 DeleteItemStreamAsync
メソッドを使用し、ResponseMessage
(ItemResponse
ではなく)を操作することで、達成出来るスループットが向上します。
次のステップ
- .NET SDKリリースについての詳細は、Azure Cosmos DB SDK記事を参照してください。
- 完了した 移行ソースコードをGitHubから取得する。
- GitHub上の追加の一括サンプル
- Azure Cosmos DB への移行のための容量計画を実行しようとしていますか?
- 既存のデータベース クラスター内の仮想コアとサーバーの数のみがわかっている場合は、仮想コア数または仮想 CPU 数を使用した要求ユニットの見積もりに関するページを参照してください
- 現在のデータベース ワークロードに対する通常の要求レートがわかっている場合は、Azure Cosmos DB Capacity Planner を使用した要求ユニットの見積もりに関するページを参照してください