Alterar o modelo de pull de feed no Azure Cosmos DB
APLICA-SE A: NoSQL
Você pode usar o modelo de pull de feed de alteração para consumir o feed de alterações do Azure Cosmos DB no seu próprio ritmo. Semelhante ao processador de feed de alterações, você pode usar o modelo de pull de feed de alterações para paralelizar o processamento de alterações em vários consumidores de feed de alterações.
Compare com o processador de alimentação de alterações
Muitos cenários podem processar o feed de alterações usando o processador de feed de alterações ou o modelo de pull de feed de alterações. Os tokens de continuação do modelo pull e o contêiner de concessão do processador de feed de alterações funcionam como marcadores para o último item processado ou lote de itens no feed de alterações.
No entanto, você não pode converter tokens de continuação em uma concessão ou vice-versa.
Nota
Na maioria dos casos, quando você precisa ler o feed de alterações, a opção mais simples é usar o processador de feed de alterações.
Você deve considerar o uso do modelo pull nestes cenários:
- Para ler as alterações de uma chave de partição específica.
- Para controlar o ritmo a que o seu cliente recebe alterações para processamento.
- Para executar uma leitura única dos dados existentes no feed de alterações (por exemplo, para fazer uma migração de dados).
Aqui estão algumas diferenças importantes entre o processador de alimentação de alterações e o modelo de receção de alimentação de alterações:
Caraterística | Processador do feed de alterações | Modelo Pull do feed de alterações |
---|---|---|
Mantendo o controle do ponto atual no processamento do feed de alterações | Concessão (armazenada em um contêiner do Azure Cosmos DB) | Token de continuação (armazenado na memória ou persistido manualmente) |
Capacidade de repetir mudanças passadas | Sim, com modelo push | Sim, com modelo pull |
Sondagem para alterações futuras | Verifica automaticamente se há alterações com base no valor especificado WithPollInterval pelo usuário |
Manual |
Comportamento onde não há novas alterações | Aguarde automaticamente o valor e WithPollInterval , em seguida, verifique novamente |
Deve verificar o status e verificar manualmente novamente |
Alterações de processo a partir de um contêiner inteiro | Sim, e automaticamente paralelizado em vários threads e máquinas que consomem do mesmo contêiner | Sim, e paralelizado manualmente usando FeedRange |
Alterações de processo a partir de apenas uma única chave de partição | Não suportado | Sim |
Nota
Quando você usa o modelo pull, ao contrário da leitura usando o processador de alimentação de alterações, você deve lidar explicitamente com casos em que não há novas alterações.
Trabalhar com o modelo pull
Para processar o feed de alterações usando o modelo pull, crie uma instância de FeedIterator
. Ao criar FeedIterator
inicialmente o , você deve especificar um valor necessário ChangeFeedStartFrom
, que consiste na posição inicial para ler as alterações e no valor que deseja usar para FeedRange
. O FeedRange
é um intervalo de valores de chave de partição e especifica os itens que podem ser lidos a partir do feed de alterações usando esse arquivo .FeedIterator
Você também deve especificar um valor necessário ChangeFeedMode
para o modo no qual deseja processar as alterações: versão mais recente ou todas as versões e exclusões. Use um ou ChangeFeedMode.LatestVersion
ChangeFeedMode.AllVersionsAndDeletes
para indicar qual modo você deseja usar para ler o feed de alterações. Ao usar todas as versões e o modo de exclusão, você deve selecionar um início de feed de alteração a partir do valor de um Now()
ou de um token de continuação específico.
Opcionalmente, você pode especificar ChangeFeedRequestOptions
para definir um PageSizeHint
arquivo . Quando definida, essa propriedade define o número máximo de itens recebidos por página. Se as operações na coleção monitorada forem executadas por meio de procedimentos armazenados, o escopo da transação será preservado ao ler itens do feed de alterações. Como resultado, o número de itens recebidos pode ser maior do que o valor especificado para que os itens alterados pela mesma transação sejam retornados como parte de um lote atômico.
Aqui está um exemplo de como obter FeedIterator
no modo de versão mais recente que retorna objetos de entidade, neste caso, um User
objeto:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Gorjeta
Antes da versão 3.34.0
, o modo de versão mais recente pode ser usado definindo ChangeFeedMode.Incremental
. LatestVersion
Ambos e Incremental
consulte o modo de versão mais recente do feed de alterações e os aplicativos que usam qualquer um dos modos verão o mesmo comportamento.
Todas as versões e exclusões do modo está em visualização e pode ser usado com versões de visualização do >SDK do .NET = 3.32.0-preview
. Aqui está um exemplo para obter FeedIterator
em todas as versões e exclui o modo que retorna User
objetos:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Nota
No modo de versão mais recente, você recebe objetos que representam o item que foi alterado, com alguns metadados extras. Todas as versões e modo de exclusão retorna um modelo de dados diferente. Para obter mais informações, consulte Analisar o objeto de resposta.
Você pode obter o exemplo completo para o modo de versão mais recente ou todas as versões e exclui o modo.
Consuma o feed de alterações através de streams
FeedIterator
para ambos os modos de alimentação de mudança tem duas opções. Além dos exemplos que retornam objetos de entidade, você também pode obter a resposta com Stream
suporte. Os fluxos permitem que você leia dados sem tê-los primeiro desserializados, para que você economize nos recursos do cliente.
Aqui está um exemplo de como obter FeedIterator
no modo de versão mais recente que retorna Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Consumir as alterações de um contêiner inteiro
Se você não fornecer um FeedRange
parâmetro para o FeedIterator
, poderá processar a alimentação de alteração de um contêiner inteiro no seu próprio ritmo. Aqui está um exemplo, que começa a ler todas as alterações, começando no momento atual usando o modo de versão mais recente:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Como o feed de alterações é efetivamente uma lista infinita de itens que englobam todas as gravações e atualizações futuras, o valor de HasMoreResults
é sempre true
. Quando você tenta ler o feed de alterações e não há novas alterações disponíveis, você recebe uma resposta com NotModified
status. No exemplo anterior, ele é tratado aguardando cinco segundos antes de verificar novamente se há alterações.
Consumir as alterações de uma chave de partição
Em alguns casos, talvez você queira processar apenas as alterações para uma chave de partição específica. Você pode obter FeedIterator
uma chave de partição específica e processar as alterações da mesma maneira que pode para um contêiner inteiro.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Usar FeedRange para paralelização
No processador de alimentação de alterações, o trabalho é automaticamente distribuído por vários consumidores. No modelo de pull de feed de alterações, você pode usar o FeedRange
para paralelizar o processamento do feed de alterações. A FeedRange
representa um intervalo de valores de chave de partição.
Aqui está um exemplo que mostra como obter uma lista de intervalos para seu contêiner:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Quando você obtém uma lista de FeedRange
valores para seu contêiner, obtém um FeedRange
por partição física.
Usando um FeedRange
, você pode criar um FeedIterator
para paralelizar o processamento do feed de alterações em várias máquinas ou threads. Ao contrário do exemplo anterior que mostrou como obter um FeedIterator
para todo o contêiner ou uma única chave de partição, você pode usar FeedRanges para obter vários FeedIterators, que podem processar o feed de alteração em paralelo.
No caso em que você deseja usar FeedRanges, você precisa ter um processo orquestrador que obtém FeedRanges e os distribui para essas máquinas. Esta distribuição pode ser:
- Usando
FeedRange.ToJsonString
e distribuindo esse valor de cadeia de caracteres. Os consumidores podem usar este valor comFeedRange.FromJsonString
. - Se a distribuição estiver em processo, passando a referência do
FeedRange
objeto.
Aqui está um exemplo que mostra como ler desde o início da alimentação de mudança do contêiner usando duas máquinas hipotéticas separadas que leem em paralelo:
Computador 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Computador 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Salvar tokens de continuação
Você pode salvar a posição do seu FeedIterator
obtendo o token de continuação. Um token de continuação é um valor de cadeia de caracteres que mantém o controle das últimas alterações processadas do seu FeedIterator e permite que o FeedIterator
retome neste ponto mais tarde. O token de continuação, se especificado, tem precedência sobre a hora de início e começa a partir dos valores iniciais. O código a seguir lê o feed de alterações desde a criação do contêiner. Depois que não houver mais alterações disponíveis, ele persistirá um token de continuação para que o consumo de feed de alterações possa ser retomado posteriormente.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
Quando você estiver usando o modo de versão mais recente, o token de FeedIterator
continuação nunca expirará enquanto o contêiner do Azure Cosmos DB ainda existir. Quando você estiver usando todas as versões e o modo de exclusão, o FeedIterator
token de continuação será válido desde que as alterações tenham ocorrido dentro da janela de retenção para backups contínuos.