Processador do feed de alterações no Azure Cosmos DB

APLICA-SE A: NoSQL

O processador do feed de alterações faz parte dos SDKs do .NET V3 e do Java V4 do Azure Cosmos DB. Ele simplifica o processo de leitura do feed de alterações e distribui o processamento de eventos entre vários consumidores com eficiência.

O principal benefício do uso do processador do feed de alterações é seu design tolerante a falhas, o que garante entrega "pelo menos uma vez" de todos os eventos no feed de alterações.

SDKs com suporte

.Net V3 Java Node.JS Python

Componentes do processador do feed de alterações

O processador do feed de alterações tem quatro componentes principais:

  • Contêiner monitorado: o contêiner monitorado tem os dados com base nos quais o feed de alterações é gerado. Todas as inserções e atualizações no contêiner monitorado são refletidas no feed de alterações do contêiner.

  • Contêiner de concessão: o contêiner de concessão atua como armazenamento de estado e coordena o processamento do feed de alterações entre vários trabalhadores. O contêiner de concessão pode ser armazenado na mesma conta que o contêiner monitorado ou em uma conta separada.

  • Instância de computação: uma instância de computação hospeda o processador do feed de alterações para escutar as alterações. Dependendo da plataforma, ela pode ser representada por uma máquina virtual (VM), um pod do Kubernetes, uma instância do Serviço de Aplicativo do Azure ou um computador físico real. A instância de computação tem um identificador exclusivo chamado nome da instância neste artigo.

  • Delegado: o delegado é o código que define o que você, desenvolvedor, deseja fazer com cada lote de alterações lido pelo processador do feed de alterações.

Para melhor entender como esses quatro elementos do processador do feed de alterações funcionam juntos, vejamos um exemplo no diagrama a seguir. O contêiner monitorado armazena documentos e usa "Cidade" como chave de partição. Os valores da chave de partição são distribuídos em intervalos (cada intervalo representa uma partição física) que contêm itens.

O diagrama mostra duas instâncias de computação e o processador do feed de alterações atribui intervalos diferentes a cada instância para maximizar a distribuição da computação. Cada instância tem um nome diferente e exclusivo.

Cada intervalo é lido em paralelo. O progresso de um intervalo é mantido separadamente de outros intervalos no contêiner de concessão por um documento de concessão. A combinação das concessões representa o estado atual do processador de feed de alterações.

Exemplo de processador do feed de alterações

Implementar o processador do feed de alterações

O processador de feed de alterações no .NET está disponível para o modo de versão mais recente e modo de todas as versões e exclusões. Todos os modos de versões e exclusões estão em versão prévia e têm suporte para o processador do feed de alterações a partir da versão 3.40.0-preview.0. O ponto de entrada para ambos os modos é sempre o contêiner monitorado.

Para ler usando o modo de versão mais recente, em uma instância Container, você chama GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Para ler usando o modo de todas as versões e exclusões, chame GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes da instância Container:

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Para ambos os modos, o primeiro parâmetro é um nome distinto que descreve a meta deste processador. O segundo nome é a implementação do delegado que manipula as alterações.

Aqui está um exemplo de um delegado para o modo de versão mais recente:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Aqui está um exemplo de um delegado para o modo de todas as versões e exclusões:

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

Posteriormente, você define o nome da instância de computação ou o identificador exclusivo usando WithInstanceName. O nome da instância de computação deve ser exclusivo e diferente para cada instância de computação que você está implantando. Você define o contêiner para manter o estado de concessão usando WithLeaseContainer.

Chamar Build fornecerá a instância do processador que você pode iniciar chamando StartAsync.

Observação

Os snippets de código anteriores são obtidos de exemplos no GitHub. Você pode obter o exemplo para o modo de versão mais recente ou para o modo de todas as versões e exclusões.

Ciclo de vida do processamento

O ciclo de vida normal de uma instância de host é:

  1. Ler o feed de alterações.
  2. Se não houver alterações, suspenda por um período predefinido (personalizável usando WithPollInterval no Construtor) e acesse o nº 1.
  3. Se houver alterações, envie-as para o delegado.
  4. Quando o delegado terminar de processar as alterações com êxito, atualize o repositório de concessão com o último ponto processado no tempo e vá para 1.

Tratamento de erros

O processador do feed de alterações é resiliente aos erros de código do usuário. Se a implementação do seu representante tiver uma exceção não tratada (Etapa nº 4), o thread que está processando esse lote específico de alterações será interrompido e um novo thread será criado. O novo thread verifica o ponto no tempo mais recente que o armazenamento de concessão salvou para esse intervalo de valores de chave de partição. O novo thread é reiniciado a partir daí, enviando efetivamente o mesmo lote de alterações para o representante. Esse comportamento continuará até que seu representante processe as alterações corretamente e, por isso, o processador do feed de alterações tem uma garantia de "pelo menos uma vez".

Observação

Um lote de alterações não é repetido em apenas um cenário. Se a falha ocorrer na primeira execução de delegado, o repositório de concessão não terá nenhum estado prévio salvo para ser usado na repetição. Nesses casos, a repetição usa a configuração inicial, que pode ou não incluir o último lote.

Para impedir que o processador do feed de alterações fique "preso" repetindo continuamente o mesmo lote de alterações, adicione lógica ao código delegado para gravar documentos, mediante exceção, em uma fila de mensagens com erros. Esse design garante o controle das alterações não processadas enquanto ainda pode continuar a processar alterações futuras. A fila de mensagens com erros pode ser outro contêiner do Azure Cosmos DB. O armazenamento de dados exato não importa. Você só quer que as alterações não processadas sejam mantidas.

Você também pode usar o avaliador do feed de alterações para monitorar o progresso das instâncias do seu processador do feed de alterações à medida que elas leem o feed de alterações, ou você pode usar as notificações do ciclo de vida para detectar falhas adjacentes.

Notificações do ciclo de vida

Você pode conectar o processador do feed de alterações a qualquer evento relevante no seu ciclo de vida. Você pode optar por ser notificado sobre um ou todos eles. A recomendação é registrar pelo menos a notificação de erro:

  • Registre um manipulador do WithLeaseAcquireNotification para ser notificado quando o host atual adquirir uma concessão para começar a processá-la.
  • Registre um manipulador do WithLeaseReleaseNotification para ser notificado quando o host atual liberar uma concessão e parar de processá-la.
  • Registre um manipulador de WithErrorNotification para ser notificado quando o host atual encontrar uma exceção durante o processamento. Você deve conseguir distinguir se a origem é o representante do usuário (uma exceção não tratada) ou um erro que o processador encontra ao tentar acessar o contêiner monitorado (por exemplo, problemas de rede).

As notificações do ciclo de vida estão disponíveis em ambos os modos do feed de alterações. Aqui está um exemplo de notificações do ciclo de vida no modo de versão mais recente:

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Unidade de implantação

Uma única unidade de implantação do processador do feed de alterações consiste de uma ou mais instâncias de computação que têm o mesmo valor de processorName e a mesma configuração de contêiner de concessão, mas cada uma com nomes de instância diferentes. Você pode ter várias unidades de implantação em que cada uma tem um fluxo de negócios diferente para as alterações e cada unidade de implantação que consiste em uma ou mais instâncias.

Por exemplo, você pode ter uma unidade de implantação que aciona uma API externa sempre que houver uma alteração no seu contêiner. Outra unidade de implantação pode mover dados em tempo real cada vez que houver uma alteração. Quando ocorre uma alteração no seu contêiner monitorado, todas as suas unidades de implantação serão notificadas.

Escala dinâmica

Conforme mencionado anteriormente, você pode ter uma ou mais instâncias de computação em uma unidade de implementação. Para aproveitar a distribuição de computação na unidade de implantação, os únicos requisitos principais são:

  • Todas as instâncias devem ter a mesma configuração do contêiner de concessão.
  • Todas as instâncias devem ter o mesmo valor de processorName.
  • Cada instância precisa ter um nome de instância diferente (WithInstanceName).

Se essas três condições se aplicarem, o processador do feed de alterações distribuirá todas as concessões que estão no contêiner de concessões entre todas as instâncias executadas dessa unidade de implantação e paralelizará a computação usando um algoritmo de distribuição igual. Uma concessão só pode pertencer a uma instância em um determinado momento, por isso, o número máximo de instâncias não deve ser maior que o número de concessões.

O número de instâncias pode aumentar e diminuir. O processador do feed de alterações ajusta dinamicamente a carga redistribuindo de acordo.

Além disso, o processador do feed de alterações pode ajustar dinamicamente a escala de um contêiner se a taxa de transferência ou o armazenamento do contêiner aumentar. Quando seu contêiner se expande, o processador do feed de alterações lida com o cenário de forma transparente, aumentando dinamicamente as concessões e distribuindo as novas concessões entre as instâncias existentes.

Hora de início

Por padrão, quando um processador do feed de alterações fore iniciado pela primeira vez, ele inicializará o contêiner de concessão e iniciará seu ciclo de vida do processamento. As alterações ocorridas no contêiner monitorado antes de o processador do feed de alterações ter sido inicializado pela primeira vez não serão detectadas.

Lendo de uma data e hora anteriores

É possível inicializar o processador do feed de alterações para ler as alterações de uma data e hora específicas, passando uma instância de DateTime para a extensão do construtor WithStartTime:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

O processador do feed de alterações é inicializado para essa data e hora específicas e começa a ler as alterações que ocorreram depois.

Lendo desde o início

Em outros cenários, como nas migrações de dados ou se você estiver analisando o histórico de um contêiner, será necessário ler o feed de alterações desde o início do tempo de vida desse contêiner. Você pode usar WithStartTime na extensão do construtor, mas passar DateTime.MinValue.ToUniversalTime(), o que gera a representação UTC do valor mínimo de DateTime como no exemplo a seguir:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

O processador do feed de alterações é inicializado e começa a ler as alterações desde o início do tempo de vida do contêiner.

Observação

Essas opções de personalização funcionam somente para configurar o ponto de partida no tempo do processador do feed de alterações. Depois que o contêiner de concessão for inicializado pela primeira vez, alterar essas opções não terá efeito.

A personalização do ponto de partida só está disponível para o modo de feed de alterações de versão mais recente. Ao usar o modo de todas as versões e exclusões, você deve começar a ler a partir do momento em que o processador é iniciado ou retomar de um estado de concessão anterior que esteja dentro do período de retenção de backup contínuo da sua conta.

Feed de alterações e taxa de transferência provisionada

As operações de leitura do feed de alterações no contêiner monitorado consumirão unidades de solicitação. Certifique-se de que seu contêiner monitorado não esteja enfrentando limitação. A limitação adiciona atrasos no recebimento dos eventos do feed de alterações nos seus processadores.

As operações no contêiner de concessão (atualizando e mantendo o estado) consomem unidades de solicitação. Quanto maior o número de instâncias usando o mesmo contêiner de concessão, maior será o consumo potencial de unidades de solicitação. Certifique-se de que o contêiner de concessão não esteja enfrentando limitações. A limitação adiciona atrasos no recebimento dos eventos do feed de alterações. A limitação pode até mesmo encerrar completamente o processamento.

Compartilhar o contêiner de concessão

Você pode compartilhar um contêiner de concessão em várias unidades de implantação. Em um contêiner de concessão compartilhado, cada unidade de implantação escuta um contêiner monitorado diferente ou tem um valor diferente para processorName. Nessa configuração, cada unidade de implantação mantém um estado independente no contêiner de concessão. Examine o consumo de unidade de solicitação em um contêiner de concessão para certificar-se de que a taxa de transferência provisionada seja suficiente para todas as unidades de implantação.

Configuração de concessão avançada

Três configurações importantes podem afetar o funcionamento do processador do feed de alterações. Cada configuração afeta o consumo de unidade de solicitação no contêiner de concessão. Você pode definir uma dessas configurações ao criar o processador do feed de alterações, mas use-as com cuidado:

  • Aquisição de concessão: por padrão, a cada 17 segundos. Um host verifica periodicamente o estado do repositório de concessão e considera a aquisição das concessões como parte do processo de dimensionamento dinâmico. Esse processo é feito executando uma Consulta no contêiner de concessão. Reduzir esse valor torna o rebalanceamento e a aquisição de concessões mais rápidos, mas aumenta o consumo de unidade de solicitação no contêiner de concessão.
  • Expiração da concessão: por padrão, 60 segundos. Define a quantidade máxima de tempo que uma concessão pode existir sem qualquer atividade de renovação antes de ser adquirida por outro host. Quando um host falha, as concessões que ele possuía são coletadas por outros hosts após esse período de tempo mais o intervalo de renovação configurado. Reduzir esse valor faz com que a recuperação após uma falha de host seja mais rápida, mas o valor de expiração nunca deve ser menor do que o intervalo de renovação.
  • Renovação da concessão: por padrão, a cada 13 segundos. Um host que possui uma concessão renova a concessão periodicamente, mesmo que não haja novas alterações a serem consumidas. Esse processo é feito executando uma substituição na concessão. Reduzir esse valor reduz o tempo necessário para detectar as concessões perdidas por falhas de host, mas aumenta o consumo de unidade de solicitação no contêiner de concessão.

Onde hospedar o processador do feed de alterações

O processador do feed de alterações pode ser hospedado em qualquer plataforma que dê suporte a processos ou tarefas de execução prolongada. Estes são alguns exemplos:

Embora o processador do feed de alterações possa ser executado nos ambientes de curta duração porque o contêiner de concessão mantém o estado, o ciclo de inicialização desses ambientes acrescenta atrasos ao tempo necessário para receber notificações (devido à sobrecarga de iniciar o processador toda vez que o ambiente é iniciado).

Requisitos de acesso baseado em função

Ao usar o Microsoft Entra ID como mecanismo de autenticação, certifique-se de que a identidade tenha as permissões adequadas:

  • No contêiner monitorado:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • No contêiner de concessão:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

Recursos adicionais

Próximas etapas

Saiba mais sobre o processador do feed de alterações nos seguintes artigos: