Azure Blob Storage の変更フィードを処理する

変更フィードでは、ストレージ アカウント内の BLOB と BLOB メタデータに対して行われるすべての変更のトランザクション ログが提供されます。 この記事では、BLOB 変更フィード プロセッサ ライブラリを使用して、変更フィード レコードを読み取る方法を示します。

変更フィードの詳細については、Azure Blob Storage の変更フィードに関するページを参照してください。

プロジェクトの設定

このセクションでは、.NET 用 Blob 変更フィード クライアント ライブラリを操作するためのプロジェクトを準備する手順について説明します。

パッケージをインストールする

プロジェクト ディレクトリから、dotnet add package コマンドを使用して、.NET 用 Azure Storage Blob 変更フィード クライアント ライブラリのパッケージをインストールします。 この例では、コマンドに --prerelease フラグを追加して、最新のプレビュー バージョンをインストールします。

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

この記事の中のコード例では、Azure Blob StorageAzure Identity パッケージも使用します。

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

using ディレクティブを追加します

コード ファイルに、次の using ディレクティブを追加します。

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

クライアント オブジェクトの作成

アプリケーションを Blob Storage に接続するには、BlobServiceClient クラスのインスタンスを作成します。 次の例では、認可のために DefaultAzureCredential を使用してクライアント オブジェクトを作成する方法を示します。 詳細については、「Blob Storage へのアクセスを承認して接続する」を参照してください。 変更フィードを操作するには、Azure RBAC 組み込みロールのストレージ BLOB データ閲覧者以上が必要です。

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

このクライアント オブジェクトは、この記事の中で示す一部のメソッドにパラメーターとして渡されます。

変更フィード内のレコードを読み取る

Note

変更フィードは、ストレージ アカウントの不変および読み取り専用のエンティティです。 任意の数のアプリケーションで、変更フィードの読み取りと処理を都合のよいときに同時に個別に行うことができます。 アプリケーションでの読み取り時に、変更フィードからレコードが削除されることはありません。 使用する各リーダーの読み取りまたは反復処理の状態は独立しており、アプリケーションによってのみ維持されます。

次のコード例では、変更フィード内のすべてのレコードを反復処理し、それらをリストに追加してから、変更フィード イベントの一覧を返します。

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

次のコード例では、変更フィード イベントの一覧から一部の値を出力します。

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

保存位置からレコードの読み取りを再開する

変更フィードに自分の読み取り位置を保存してから、後でレコードの反復処理を再開するように選択できます。 読み取り位置を保存するには、変更フィード カーソルを取得します。 このカーソルは文字列であり、アプリケーションはその文字列を、アプリケーションの設計に適した任意の手段 (ファイルやデータベースなど) で保存できます。

この例では、変更フィード内のすべてのレコードを反復処理し、それらをリストに追加し、カーソルを保存します。 リストとカーソルは呼び出し元に返されます。

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

レコードのストリーム処理

変更フィード レコードが変更フィードにコミットされる際に、それらを処理することを選択できます。 「仕様」を参照してください。 変更イベントは、平均 60 秒の期間に変更フィードに発行されます。 ポーリング間隔を指定する場合は、この期間を考慮して新しい変更をポーリングすることをお勧めします。

この例では、変更を定期的にポーリングします。 変更レコードが存在する場合、このコードではそれらのレコードが処理され、変更フィード カーソルが保存されます。 このようにすると、プロセスがいったん停止されてから再び開始される場合、アプリケーションでカーソルが使用され、最後に中断した箇所のレコードの処理が再開されます。 この例では、デモの目的でカーソルをローカル ファイルに保存しますが、アプリケーションではユーザーのシナリオに最も適した任意の形式で保存できます。

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

特定の時間の範囲内のレコードを読み取る

特定の時間範囲内のレコードを読み取ることができます。 この例では、特定の日付と時間の範囲内にある変更フィード内のすべてのレコードを反復処理し、それらをリストに追加して、そのリストを返します。

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

指定した開始時刻は、最も近い時間に切り捨てられ、終了時刻は最も近い時間に切り上げられます。 開始時刻の前と終了時刻の後に発生したイベントがユーザーに表示される可能性があります。 また、開始時刻と終了時刻の間に発生する一部のイベントが表示されない場合もあります。 これは、イベントが開始時刻の前の時間中または終了時刻の後の時間中に記録される可能性があるためです。

次のステップ