Alterar o modelo de pull do feed de alterações no Azure Cosmos DB

APLICA-SE A: NoSQL

Com o modelo de pull do feed de alterações, você pode consumir o feed de alterações do Azure Cosmos DB em seu próprio ritmo. Do mesmo modo do que já é possível fazer com o processador do feed de alterações, você pode usar o modelo de pull do feed de alterações para paralelizar o processamento de alterações entre vários consumidores de feed de alterações.

Comparar com o processador do feed de alterações

Muitos cenários podem processar o feed de alterações usando o processador do feed de alterações ou o modelo de pull do feed de alterações. Os tokens de continuação do modelo de pull e o contêiner de concessão do processador de feed de alterações atuam como indicadores para o último item processado ou lote de itens no feed de alterações.

No entanto, não é possível converter tokens de continuação em uma concessão ou vice-versa.

Observação

Na maioria dos casos, quando você precisa ler do feed de alterações, a opção mais simples é usar o processador do feed de alterações.

Você deve considerar o uso do modelo de pull nestes cenários:

  • Para ler as alterações de uma chave de partição específica.
  • Para controlar o ritmo no qual seu cliente recebe alterações para processamento.
  • Para fazer uma leitura única dos dados existentes no feed de alterações (por exemplo, fazer uma migração de dados).

Aqui estão algumas diferenças importantes entre o processador do feed de alterações e o modelo de pull do feed de alterações:

Recurso Alterar o processador de feed Modelo de pull do feed de alterações
Acompanhar o ponto atual no processamento do feed de alterações Concessão (armazenada em um contêiner do Azure Cosmos DB) Token de continuação (armazenado na memória ou persistido manualmente)
Capacidade de reproduzir alterações passadas Sim, com o modelo de push Sim, com o modelo de pull
Sondagem para alterações futuras Verifica automaticamente se há alterações com base no valor WithPollInterval especificado pelo usuário Manual
Comportamento em que não há novas alterações Aguarde automaticamente o valor para WithPollInterval e, em seguida, verifique novamente Deve verificar o status e verificar novamente manualmente
Processar alterações de todo o contêiner Sim, e paralelizado automaticamente em vários threads e computadores consumindo do mesmo contêiner Sim, e paralelizado manualmente usando FeedRange
Processar alterações de apenas uma chave de partição Sem suporte Sim

Observação

Ao contrário da leitura usando o processador do feed de alterações, ao usar o modelo de pull você deve lidar explicitamente com casos em que não há novas alterações.

Trabalhando com o modelo de pull

Para processar o feed de alterações usando o modelo de pull, crie uma instância de FeedIterator. Ao criar inicialmente um FeedIterator, você deve especificar um valor de ChangeFeedStartFrom obrigatório, que consiste na posição inicial para ler as alterações e o FeedRange desejado. O FeedRange é um intervalo de valores de chave de partição e especifica os itens que podem ser lidos do feed de alterações usando esse FeedIterator específico. Você também deve especificar um valor de ChangeFeedMode necessário para o modo no qual deseja processar alterações: versão mais recente ou todas as versões e exclusões. Use ChangeFeedMode.LatestVersionou ChangeFeedMode.AllVersionsAndDeletes para indicar em qual modo você deseja ler o feed de alterações. Ao usar o modo todas as versões e exclusões, você deve selecionar um início do feed de alterações do valor de Now() ou de um token de continuação específico.

Opcionalmente, você pode especificar ChangeFeedRequestOptions para definir um PageSizeHint. Quando definida, essa propriedade define o número máximo de itens recebidos por página. Se as operações na coleção monitorada forem executadas por meio de procedimentos armazenados, o escopo da transação será preservado durante a leitura de itens do feed de alterações. Como resultado, o número de itens recebidos pode ser maior que o valor especificado para que os itens alterados pela mesma transação sejam retornados como parte de um lote atômico.

Aqui está um exemplo para obter um FeedIterator no modo de versão mais recente que retorna objetos de entidade, neste caso, um objeto User:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Dica

Antes da versão 3.34.0, o modo de versão mais recente pode ser usado definindo ChangeFeedMode.Incremental. Tanto IncrementalquantoLatestVersion referem-se ao modo de versão mais recente do feed de alterações e os aplicativos que usam qualquer um dos modos verão o mesmo comportamento.

O modo de todas as versões e exclusões está em versão prévia e pode ser usado com a versão >= 3.32.0-preview do SDK do .NET. Aqui está um exemplo para obter FeedIterator em todas as versões e excluir o modo que retorna objetos User:

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Observação

No modo de versão mais recente, você receberá objetos que representam o item que foi alterado com alguns metadados extras. O modo de todas as versões e exclusões retorna um modelo de dados diferente. Para obter mais informações, consulte Analisar o objeto de resposta.

Você pode obter o exemplo completo para modo de versão mais recente ou modo de todas as versões e exclusões.

Consumir o feed de alterações por meio de fluxos

FeedIterator para ambos os modos de feed de alterações existem duas opções. Além dos exemplos que retornam objetos de entidade, você também pode obter a resposta com suporte a Stream. Os fluxos permitem que você leia os dados sem necessidade de desserializá-los primeiro, economizando recursos do cliente.

Aqui está um exemplo para obter FeedIterator no modo de versão mais recente que retorna Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Consumir as alterações para um contêiner inteiro

Se você não fornecer um parâmetro FeedRange para FeedIterator, poderá processar o feed de alterações de um contêiner inteiro em seu próprio ritmo. Aqui está um exemplo que começa a ler todas as alterações começando no momento atual, usando o modo de versão mais recente:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Como o feed de alterações é efetivamente uma lista infinita de itens que abrangem todas as futuras gravações e atualizações, o valor de HasMoreResults é sempre true. Ao tentar ler o feed de alterações e não houver nenhuma nova alteração disponível, você recebe uma resposta com o status NotModified. No exemplo acima, ela é tratada aguardando cinco segundos antes de verificar novamente se há alterações.

Consumir as alterações para uma chave de partição

Em alguns casos, talvez você queira processar apenas as alterações para uma chave de partição específica. Você pode obter FeedIterator para uma chave de partição específica e processar as alterações da mesma maneira que pode fazê-lo para um contêiner inteiro.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Usar FeedRange para paralelização

No processador do feed de alterações, o trabalho é distribuído automaticamente entre vários consumidores. No modelo de pull do feed de alterações, você pode usar o FeedRange para paralelizar o processamento do feed de alterações. Um FeedRange representa um intervalo de valores de chave de partição.

Aqui está um exemplo que mostra como obter uma lista de intervalos para o contêiner:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Ao obter a lista de valores FeedRange para seu contêiner, você obtém um FeedRange por partição física.

Usando um FeedRange, você pode criar um FeedIterator para paralelizar o processamento do feed de alterações em vários computadores ou threads. Ao contrário do exemplo anterior, que mostrou como obter um FeedIterator para o contêiner inteiro ou uma única chave de partição, é possível usar FeedRanges para obter vários FeedIterators que podem processar o feed de alterações em paralelo.

No caso em que você deseja usar o FeedRanges, você precisa ter um processo orquestrador que obtém FeedRanges e os distribui para esses computadores. Essa distribuição pode ser:

  • Usando FeedRange.ToJsonString e distribuindo esse valor de cadeia de caracteres. Os consumidores podem usar esse valor com FeedRange.FromJsonString.
  • Se a distribuição estiver em processo, passando a referência do objeto FeedRange.

Aqui está uma amostra que mostra como ler desde o início do feed de alterações do contêiner usando dois computadores hipotéticos separados que estão realizando a leitura em paralelo:

Computador 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Computador 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Salvar tokens de continuação

Você pode salvar a posição de seu FeedIterator obtendo o token de continuação. Um token de continuação é um valor de cadeia de caracteres que mantém o controle das últimas alterações processadas do FeedIterator e permite que FeedIterator retome nesse ponto mais tarde. O token de continuação, se especificado, tem precedência sobre os valores da Hora de início e Começar do início. O código a seguir lê o feed de alterações desde a criação do contêiner. Depois que não houver mais alterações disponíveis, ele manterá um token de continuação para que o consumo do feed de alterações possa ser retomado posteriormente.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

Quando você estiver usando o modo de versão mais recente, o token de continuação FeedIterator nunca expirará enquanto o contêiner do Azure Cosmos DB ainda existir. Quando você estiver usando o modo todas as versões e exclusões, o token de continuação FeedIterator é válido desde que as alterações tenham ocorrido dentro da janela de retenção para backups contínuos.

Próximas etapas