Verarbeiten des Änderungsfeeds in Azure Blob Storage

Der Änderungsfeed stellt Transaktionsprotokolle für alle Änderungen bereit, die in den Blobs und den Blobmetadaten in Ihrem Speicherkonto auftreten. In diesem Artikel wird gezeigt, wie Sie Änderungsfeeddatensätze mithilfe der Change Feed Processor-Bibliothek für Blobs lesen.

Weitere Informationen zum Änderungsfeed finden Sie unter Änderungsfeed in Azure Blob Storage.

Einrichten des Projekts

In diesem Abschnitt wird beschrieben, wie ein Projekt zum Arbeiten mit der Blobänderungsfeed-Clientbibliothek für .NET vorbereitet wird.

Installieren von Paketen

Installieren Sie aus Ihrem Projektverzeichnis das Paket für die Clientbibliothek des Azure Storage-Blobänderungsfeeds für .NET mithilfe des Befehls dotnet add package. In diesem Beispiel fügen wir dem Befehl das --prerelease-Flag hinzu, um die neueste Vorschauversion zu installieren.

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

Die Codebeispiele in diesem Artikel verwenden auch die Azure Blob Storage- und Azure-Identität-Pakete.

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

Fügen Sie Anweisungen vom Typ using hinzu.

Fügen Sie Ihrer Codedatei die folgenden using-Anweisungen hinzu:

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

Erstellen eines Clientobjekts

Um eine Verbindung von der Anwendung mit Blob Storage herzustellen, erstellen Sie eine Instanz der BlobServiceClient-Klasse. Das folgende Beispiel zeigt, wie Sie ein Clientobjekt erstellen und dabei DefaultAzureCredential für die Autorisierung verwenden. Weitere Informationen finden Sie unter Autorisieren des Zugriffs und Herstellen einer Verbindung mit Blob Storage. Um mit dem Änderungsfeed zu arbeiten, benötigen Sie die integrierte Azure RBAC-Rolle Storage-Blobdatenleser oder höher.

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

Das Clientobjekt wird als Parameter an einige der in diesem Artikel gezeigten Methoden übergeben.

Lesen von Datensätzen im Änderungsfeed

Hinweis

Der Änderungsfeed ist eine unveränderliche und schreibgeschützte Entität in Ihrem Speicherkonto. Mehrere Anwendungen können den Änderungsfeed gleichzeitig und unabhängig voneinander nach Belieben lesen und verarbeiten. Datensätze werden nicht aus dem Änderungsfeed entfernt, wenn sie von einer Anwendung gelesen werden. Der Lese- oder Iterationszustand jedes Readers ist unabhängig und wird nur von der jeweiligen Anwendung verwaltet.

Im folgenden Codebeispiel werden alle Datensätze im Änderungsfeed durchlaufen, einer Liste hinzugefügt und dann die Liste der Änderungsfeedereignisse zurückgegeben:

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Im folgenden Codebeispiel werden einige Werte aus der Liste der Änderungsfeedereignisse ausgegeben:

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

Fortsetzen des Lesens von Datensätzen ab einer gespeicherten Position

Sie können Ihre Leseposition im Änderungsfeed speichern und dann das Durchlaufen der Datensätze zu einem späteren Zeitpunkt fortsetzen. Sie können die Leseposition speichern, indem Sie den Änderungsfeedcursor abrufen. Dieser Cursor ist eine Zeichenfolge, die Ihre Anwendung entsprechend ihres Entwurfs z. B. in einer Datei oder Datenbank speichern kann.

In diesem Beispiel werden alle Datensätze im Änderungsfeed durchlaufen, einer Liste hinzugefügt, und der Cursor wird gespeichert. Die Liste und der Cursor werden an den Aufrufer zurückgegeben.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Streamverarbeitung von Datensätzen

Sie können Änderungsfeeddatensätze verarbeiten, während sie an den Änderungsfeed commitet werden. Weitere Informationen finden Sie unter Spezifikationen. Änderungsereignisse werden im Durchschnitt nach 60 Sekunden im Änderungsfeed veröffentlicht. Wir empfehlen, bei der Festlegung des Abrufintervalls und des Abrufs neuer Änderungen diesen Zeitraum zu berücksichtigen.

In diesem Beispiel werden regelmäßig Änderungen abgerufen. Wenn Änderungsdatensätze vorhanden sind, verarbeitet dieser Code die Datensätze und speichert den Änderungsfeedcursor. Auf diese Weise kann die Anwendung, wenn der Vorgang angehalten und anschließend neu gestartet wird, mit dem Cursor die Verarbeitung der Datensätze an der Stelle fortsetzen, an der sie unterbrochen wurde. In diesem Beispiel wird der Cursor zu Demonstrationszwecken in einer lokalen Datei gespeichert. Ihre Anwendung kann ihn jedoch in einer beliebigen Form speichern, die für Ihr Szenario am sinnvollsten ist.

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

Lesen von Datensätzen innerhalb eines bestimmten Zeitraums

Sie können Datensätze lesen, die in einem bestimmten Zeitbereich liegen. In diesem Beispiel werden alle Datensätze im Änderungsfeed durchlaufen, die in einen bestimmten Datums- und Zeitbereich fallen. Außerdem werden sie zu einer Liste hinzugefügt und die Liste zurückgegeben:

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Die von Ihnen angegebene Startzeit wird auf die volle Stunde abgerundet, und die Endzeit wird auf die volle Stunde aufgerundet. Möglicherweise werden Benutzern Ereignisse angezeigt, die vor der Startzeit und nach der Endzeit aufgetreten sind. Es ist auch möglich, das einige Ereignisse, die zwischen Start- und Endzeit liegen, nicht angezeigt werden. Das liegt daran, dass Ereignisse möglicherweise in der Stunde vor der Startzeit oder in der Stunde nach der Endzeit aufgezeichnet wurden.

Nächste Schritte