Alterar o modelo de pull de feed no Azure Cosmos DB

APLICA-SE A: NoSQL

Você pode usar o modelo de pull de feed de alteração para consumir o feed de alterações do Azure Cosmos DB no seu próprio ritmo. Semelhante ao processador de feed de alterações, você pode usar o modelo de pull de feed de alterações para paralelizar o processamento de alterações em vários consumidores de feed de alterações.

Compare com o processador de alimentação de alterações

Muitos cenários podem processar o feed de alterações usando o processador de feed de alterações ou o modelo de pull de feed de alterações. Os tokens de continuação do modelo pull e o contêiner de concessão do processador de feed de alterações funcionam como marcadores para o último item processado ou lote de itens no feed de alterações.

No entanto, você não pode converter tokens de continuação em uma concessão ou vice-versa.

Nota

Na maioria dos casos, quando você precisa ler o feed de alterações, a opção mais simples é usar o processador de feed de alterações.

Você deve considerar o uso do modelo pull nestes cenários:

  • Para ler as alterações de uma chave de partição específica.
  • Para controlar o ritmo a que o seu cliente recebe alterações para processamento.
  • Para executar uma leitura única dos dados existentes no feed de alterações (por exemplo, para fazer uma migração de dados).

Aqui estão algumas diferenças importantes entre o processador de alimentação de alterações e o modelo de receção de alimentação de alterações:

Caraterística Processador do feed de alterações Modelo Pull do feed de alterações
Mantendo o controle do ponto atual no processamento do feed de alterações Concessão (armazenada em um contêiner do Azure Cosmos DB) Token de continuação (armazenado na memória ou persistido manualmente)
Capacidade de repetir mudanças passadas Sim, com modelo push Sim, com modelo pull
Sondagem para alterações futuras Verifica automaticamente se há alterações com base no valor especificado WithPollInterval pelo usuário Manual
Comportamento onde não há novas alterações Aguarde automaticamente o valor e WithPollInterval , em seguida, verifique novamente Deve verificar o status e verificar manualmente novamente
Alterações de processo a partir de um contêiner inteiro Sim, e automaticamente paralelizado em vários threads e máquinas que consomem do mesmo contêiner Sim, e paralelizado manualmente usando FeedRange
Alterações de processo a partir de apenas uma única chave de partição Não suportado Sim

Nota

Quando você usa o modelo pull, ao contrário da leitura usando o processador de alimentação de alterações, você deve lidar explicitamente com casos em que não há novas alterações.

Trabalhar com o modelo pull

Para processar o feed de alterações usando o modelo pull, crie uma instância de FeedIterator. Ao criar FeedIteratorinicialmente o , você deve especificar um valor necessário ChangeFeedStartFrom , que consiste na posição inicial para ler as alterações e no valor que deseja usar para FeedRange. O FeedRange é um intervalo de valores de chave de partição e especifica os itens que podem ser lidos a partir do feed de alterações usando esse arquivo .FeedIterator Você também deve especificar um valor necessário ChangeFeedMode para o modo no qual deseja processar as alterações: versão mais recente ou todas as versões e exclusões. Use um ou ChangeFeedMode.LatestVersion ChangeFeedMode.AllVersionsAndDeletes para indicar qual modo você deseja usar para ler o feed de alterações. Ao usar todas as versões e o modo de exclusão, você deve selecionar um início de feed de alteração a partir do valor de um Now() ou de um token de continuação específico.

Opcionalmente, você pode especificar ChangeFeedRequestOptions para definir um PageSizeHintarquivo . Quando definida, essa propriedade define o número máximo de itens recebidos por página. Se as operações na coleção monitorada forem executadas por meio de procedimentos armazenados, o escopo da transação será preservado ao ler itens do feed de alterações. Como resultado, o número de itens recebidos pode ser maior do que o valor especificado para que os itens alterados pela mesma transação sejam retornados como parte de um lote atômico.

Aqui está um exemplo de como obter FeedIterator no modo de versão mais recente que retorna objetos de entidade, neste caso, um User objeto:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Gorjeta

Antes da versão 3.34.0, o modo de versão mais recente pode ser usado definindo ChangeFeedMode.Incremental. LatestVersion Ambos e Incremental consulte o modo de versão mais recente do feed de alterações e os aplicativos que usam qualquer um dos modos verão o mesmo comportamento.

Todas as versões e exclusões do modo está em visualização e pode ser usado com versões de visualização do >SDK do .NET = 3.32.0-preview. Aqui está um exemplo para obter FeedIterator em todas as versões e exclui o modo que retorna User objetos:

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Nota

No modo de versão mais recente, você recebe objetos que representam o item que foi alterado, com alguns metadados extras. Todas as versões e modo de exclusão retorna um modelo de dados diferente. Para obter mais informações, consulte Analisar o objeto de resposta.

Você pode obter o exemplo completo para o modo de versão mais recente ou todas as versões e exclui o modo.

Consuma o feed de alterações através de streams

FeedIterator para ambos os modos de alimentação de mudança tem duas opções. Além dos exemplos que retornam objetos de entidade, você também pode obter a resposta com Stream suporte. Os fluxos permitem que você leia dados sem tê-los primeiro desserializados, para que você economize nos recursos do cliente.

Aqui está um exemplo de como obter FeedIterator no modo de versão mais recente que retorna Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Consumir as alterações de um contêiner inteiro

Se você não fornecer um FeedRange parâmetro para o FeedIterator, poderá processar a alimentação de alteração de um contêiner inteiro no seu próprio ritmo. Aqui está um exemplo, que começa a ler todas as alterações, começando no momento atual usando o modo de versão mais recente:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Como o feed de alterações é efetivamente uma lista infinita de itens que englobam todas as gravações e atualizações futuras, o valor de HasMoreResults é sempre true. Quando você tenta ler o feed de alterações e não há novas alterações disponíveis, você recebe uma resposta com NotModified status. No exemplo anterior, ele é tratado aguardando cinco segundos antes de verificar novamente se há alterações.

Consumir as alterações de uma chave de partição

Em alguns casos, talvez você queira processar apenas as alterações para uma chave de partição específica. Você pode obter FeedIterator uma chave de partição específica e processar as alterações da mesma maneira que pode para um contêiner inteiro.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Usar FeedRange para paralelização

No processador de alimentação de alterações, o trabalho é automaticamente distribuído por vários consumidores. No modelo de pull de feed de alterações, você pode usar o FeedRange para paralelizar o processamento do feed de alterações. A FeedRange representa um intervalo de valores de chave de partição.

Aqui está um exemplo que mostra como obter uma lista de intervalos para seu contêiner:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Quando você obtém uma lista de FeedRange valores para seu contêiner, obtém um FeedRange por partição física.

Usando um FeedRange, você pode criar um FeedIterator para paralelizar o processamento do feed de alterações em várias máquinas ou threads. Ao contrário do exemplo anterior que mostrou como obter um FeedIterator para todo o contêiner ou uma única chave de partição, você pode usar FeedRanges para obter vários FeedIterators, que podem processar o feed de alteração em paralelo.

No caso em que você deseja usar FeedRanges, você precisa ter um processo orquestrador que obtém FeedRanges e os distribui para essas máquinas. Esta distribuição pode ser:

  • Usando FeedRange.ToJsonString e distribuindo esse valor de cadeia de caracteres. Os consumidores podem usar este valor com FeedRange.FromJsonString.
  • Se a distribuição estiver em processo, passando a referência do FeedRange objeto.

Aqui está um exemplo que mostra como ler desde o início da alimentação de mudança do contêiner usando duas máquinas hipotéticas separadas que leem em paralelo:

Computador 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Computador 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Salvar tokens de continuação

Você pode salvar a posição do seu FeedIterator obtendo o token de continuação. Um token de continuação é um valor de cadeia de caracteres que mantém o controle das últimas alterações processadas do seu FeedIterator e permite que o FeedIterator retome neste ponto mais tarde. O token de continuação, se especificado, tem precedência sobre a hora de início e começa a partir dos valores iniciais. O código a seguir lê o feed de alterações desde a criação do contêiner. Depois que não houver mais alterações disponíveis, ele persistirá um token de continuação para que o consumo de feed de alterações possa ser retomado posteriormente.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

Quando você estiver usando o modo de versão mais recente, o token de FeedIterator continuação nunca expirará enquanto o contêiner do Azure Cosmos DB ainda existir. Quando você estiver usando todas as versões e o modo de exclusão, o FeedIterator token de continuação será válido desde que as alterações tenham ocorrido dentro da janela de retenção para backups contínuos.

Próximos passos