Procesadores de fuente de cambios de Azure Cosmos DB

SE APLICA A: NoSQL

El procesador de fuente de cambios es parte de los SDK .NET V3 and Java V4 de Azure Cosmos DB. Simplifica el proceso de lectura de la fuente de cambios y distribuye el procesamiento de eventos entre varios consumidores de manera eficaz.

La principal ventaja de usar el procesador de fuente de cambios es su diseño tolerante a errores, que garantiza una entrega "al menos una vez" de todos los eventos de la fuente de cambios.

SDK admitidos

.Net V3 Java Node.JS Python

Componentes del procesador de fuente de cambios

El procesador de fuente de cambios tiene cuatro componentes principales:

  • El contenedor supervisado: el contenedor supervisado tiene los datos a partir de los cuales se genera la fuente de cambios. Todas las inserciones y actualizaciones realizadas en el contenedor supervisado se reflejan en la fuente de cambios del contenedor.

  • Contenedor de concesión: el contenedor de concesión actúa como un almacenamiento de estado y coordina el procesamiento de la fuente de cambios entre varios trabajos. El contenedor de concesión se puede almacenar en la misma cuenta que el contenedor supervisado o en una cuenta independiente.

  • Instancia de proceso: una instancia de proceso hospeda el procesador de fuente de cambios para escuchar los cambios. En función de la plataforma, se puede representar mediante una máquina virtual (VM), un pod de Kubernetes, una instancia de Azure App Service o una máquina física real. La instancia de proceso tiene un identificador único que se denomina nombre de instancia en este artículo.

  • El delegado: el delegado es el código que define lo que usted, el desarrollador, desea hacer con cada lote de cambios que el procesador de la fuente de cambios lea.

Para comprender mejor cómo funcionan estos cuatro elementos del procesador de fuente de cambios juntos, echemos un vistazo a un ejemplo en el diagrama siguiente. El contenedor supervisado almacena elementos y usa "City" como clave de partición. Los valores de la clave de partición se distribuyen en rangos (cada rango representa una partición física) que contienen elementos.

En el diagrama se muestran dos instancias de proceso, y el procesador de fuente de cambios asigna intervalos diferentes a cada instancia para maximizar la distribución de proceso. Cada instancia tiene un nombre único y diferente.

Cada rango se lee en paralelo. El progreso de cada rango se mantiene por separado de otros rangos en el contenedor de concesión mediante un documento de concesión. La combinación de las concesiones representa el estado actual del procesador de fuente de cambios.

Ejemplo de procesador de fuente de cambios

Implementación del procesador de fuente de cambios

El procesador de fuente de cambios de .NET está disponible para modo de versión más reciente y todas las versiones y elimina el modo. Todas las versiones y el modo de eliminaciones están en versión preliminar y se admiten para el procesador de fuente de cambios a partir de la versión 3.40.0-preview.0. El punto de entrada para ambos modos siempre es el contenedor supervisado.

Para leer con el modo de versión más reciente, en una instancia Container, llame a GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Para leer con todas las versiones y el modo de eliminaciones, llame a GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes desde la instancia Container:

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

En ambos modos, el primer parámetro es un nombre distinto que describe el objetivo de este procesador. El segundo nombre es la implementación del delegado que controla los cambios.

Este es un ejemplo de un delegado para el modo de versión más reciente:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Este es un ejemplo de un delegado para todas las versiones y el modo de eliminaciones:

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

Después, se define el nombre de la instancia de proceso o el identificador único mediante WithInstanceName. El nombre de la instancia de proceso debe ser único y diferente para cada instancia de proceso que vaya a implementar. Debe establecer el contenedor para mantener el estado de concesión mediante WithLeaseContainer.

La llamada a Build le proporciona la instancia del procesador que puede iniciar mediante una llamada a StartAsync.

Nota:

Los fragmentos de código anteriores se han tomado de ejemplos en GitHub. Puede encontrar el ejemplo para el modo de versión más reciente o para el modo de todas las versiones y eliminaciones.

Ciclo de vida de procesamiento

El ciclo de vida normal de una instancia de host es:

  1. Leer la fuente de cambios.
  2. Si no hay ningún cambio, se mantiene en suspensión durante un periodo de tiempo predefinido (personalizable con WithPollInterval en el generador) y se vuelve al primer paso.
  3. Si hay cambios, enviarlos al delegado.
  4. Cuando el delegado termina de procesar los cambios correctamente, actualizar el almacén de concesión con el último punto en el tiempo e ir al nº 1.

Control de errores

El procesador de fuente de cambios es resistente a los errores de código de usuario. Si la implementación del delegado tiene una excepción no controlada (paso nº 4), el subproceso que está procesando ese lote específico de cambios se detiene y finalmente se crea un nuevo subproceso. El nuevo subproceso comprueba el último punto en el tiempo que el almacén de concesiones ha guardado para ese intervalo de valores de clave de partición. El nuevo subproceso se reiniciará desde allí, enviando de forma eficaz el mismo lote de cambios al delegado. Este comportamiento continuará hasta que el delegado procese los cambios correctamente, y es el motivo por el que el procesador de fuente de cambios tiene una garantía de "al menos una vez".

Nota:

Solo hay un escenario en el que no se reintentará un lote de cambios. Si el error se produce en la primera ejecución del delegado, el almacén de concesiones no tiene ningún estado guardado anterior para usarlo en el reintento. En esos casos, el reintento usa la primera configuración de inicio, que podría incluir o no el último lote.

Para evitar que el procesador de fuente de cambios se "atasque" al reintentar continuamente el mismo lote de cambios, debe agregar lógica al código de delegado para escribir documentos, en caso de excepción, en una cola de mensajes fallidos. Este diseño garantiza que se pueda realizar un seguimiento de los cambios sin procesar a la vez que se siguen procesando cambios futuros. La cola de mensajes fallidos podría ser otro contenedor de Azure Cosmos DB. El almacén de datos exacto no importa, sino simplemente que se conserven los cambios sin procesar.

Además, puede usar el estimador de la fuente de cambios para supervisar el progreso de las instancias del procesador de la fuente de cambios a medida que leen la fuente de cambios, o bien puede usar la notificación del ciclo de vida para detectar errores subyacentes.

Notificaciones de ciclo de vida

Puede conectar el procesador de fuente de cambios a cualquier evento relevante en su ciclo de vida. Puede optar por recibir notificaciones de uno o de todos ellos. La recomendación es registrar al menos la notificación del error:

  • Registre un controlador para que WithLeaseAcquireNotification reciba una notificación cuando el host actual adquiera una concesión para empezar a procesarlo.
  • Registre un controlador para que WithLeaseReleaseNotification reciba una notificación cuando el host actual libere una concesión y deje de procesarla.
  • Registre un controlador de WithErrorNotification para recibir una notificación cuando el host actual encuentre una excepción durante el procesamiento. Debe poder distinguir si el origen es el delegado de usuario (una excepción no controlada) o un error que el procesador encuentra cuando intenta acceder al contenedor supervisado (por ejemplo, problemas de red).

Las notificaciones del ciclo de vida están disponibles en ambos modos de fuente de cambios. Este es un ejemplo de notificaciones del ciclo de vida en el modo de versión más reciente:

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Unidad de implementación

Una sola unidad de implementación de procesador de fuente de cambios está formada por una o varias instancias de proceso con el mismo valor de processorName y la misma configuración de contenedor de concesión, pero con un nombre de instancia diferente en cada caso. Puede tener muchas unidades de implementación donde cada una tiene un flujo de negocio diferente para los cambios y cada unidad de implementación que consta de una o varias instancias.

Por ejemplo, podría tener una unidad de implementación que desencadene una API externa cada vez que se produzca un cambio en el contenedor. Otra unidad de implementación podría trasladar datos, en tiempo real cada vez que se produzca un cambio. Cuando se produzca un cambio en el contenedor supervisado, todas las unidades de implementación recibirán una notificación.

Escalado dinámico

Como se mencionó antes, en una unidad de implementación puede tener una o más instancias de proceso. Para beneficiarse de la distribución de proceso dentro de la unidad de implementación, los únicos requisitos clave son los siguientes:

  • Todas las instancias deben tener la misma configuración de contenedor de concesión.
  • Todas las instancias deben tener el mismo valor de processorName.
  • Cada instancia tiene que tener un nombre de instancia diferente (WithInstanceName).

Si estas tres condiciones son aplicables, el procesador de fuente de cambios, mediante el uso de un algoritmo de distribución equitativa, distribuirá todas las concesiones en el contenedor de concesiones en todas las instancias en ejecución de esa unidad de implementación y paralizará el proceso. Una concesión es propiedad de una instancia en un momento dado, por lo que el número máximo de instancias no debería ser superior al número de concesiones.

El número de instancias puede crecer y reducirse. El procesador de fuente de cambios ajusta dinámicamente la carga redistribuyéndola en consecuencia.

Además, el procesador de fuente de cambios puede ajustarse de forma dinámica a la escala del contenedor si aumenta el rendimiento o el almacenamiento de este. Cuando el contenedor crece, el procesador de fuente de cambios controla de forma transparente estos escenarios al aumentar dinámicamente las concesiones y distribuir las nuevas concesiones entre las instancias existentes.

Hora de inicio

De manera predeterminada, cuando se inicie un procesador de fuente de cambios por primera vez, inicializará el contenedor de concesiones e iniciará su ciclo de vida de procesamiento. No se detectó ningún cambio ocurrido en el contenedor supervisado antes de que se inicializara por primera vez el procesador de fuente de cambios.

Lectura desde una fecha y hora anteriores

Es posible inicializar el procesador de fuente de cambios para leer los cambios a partir de una fecha y hora específicas; para ello, pase una instancia de DateTime a la extensión del generador WithStartTime:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

El procesador de fuente de cambios se inicializará para esa fecha y hora específicas y comenzará a leer los cambios que se produzcan después.

Lectura desde el principio

En otros escenarios, como las migraciones de datos o el análisis de todo el historial de un contenedor, es necesario leer la fuente de cambios desde el principio de la vigencia de ese contenedor. Para ello, podemos usar WithStartTime en la extensión del generador, pero pasando DateTime.MinValue.ToUniversalTime(), de forma que se genere la representación UTC del valor DateTime mínimo, tal como se muestra en el siguiente ejemplo:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

El procesador de fuente de cambios se inicializará y comenzará a leer los cambios desde el principio de la vigencia del contenedor.

Nota

Estas opciones de personalización solo funcionan para configurar el momento dado inicial del procesador de fuente de cambios. Una vez que el contenedor de concesiones se inicialice por primera vez, no tendrá ningún efecto cambiar las opciones.

La personalización del punto de partida solo está disponible para el modo de fuente de cambios de versión más reciente. Al usar todas las versiones y el modo de eliminaciones, debe empezar a leer desde el momento en que se inicia el procesador o reanudar desde un estado de concesión anterior que se encuentra dentro del copia de seguridad continua período de retención de la cuenta.

Fuente de cambios y rendimiento aprovisionado

Las operaciones de lectura de fuente de cambios en el contenedor supervisado consumen unidades de solicitud. Asegúrese de que el contenedor supervisado no está experimentando ninguna limitación. Una limitación incrementará los retrasos en la recepción de eventos de fuente de cambios en los procesadores.

Las operaciones en el contenedor de concesión (actualización y mantenimiento del estado) consumen unidades de solicitud. Cuanto mayor sea el número de instancias que usan el mismo contenedor de concesión, mayor será el posible consumo de unidades de solicitud. Asegúrese de que el contenedor de concesión no está experimentando ninguna limitación. Una limitación incrementará los retrasos en la recepción de eventos de fuente de cambios. Una limitación puede incluso finalizar completamente el procesamiento.

Uso compartido del contenedor de concesión

Puede compartir un contenedor de concesión en varias unidades de implementación. En un contenedor de concesión compartido, cada unidad de implementación escucha un contenedor supervisado diferente o tiene un valor diferente de processorName. Con esta configuración, cada unidad de implementación mantiene un estado independiente en el contenedor de concesión. Revise el consumo de unidades de solicitud en el contenedor de concesión para asegurarse de que el rendimiento aprovisionado es suficiente para todas las unidades de implementación.

Configuración avanzada de concesiones

Tres configuraciones clave pueden afectar a cómo funciona el procesador de fuente de cambios. Cada configuración afecta al consumo de unidades de solicitud en el contenedor de concesión. Puede establecer una de estas configuraciones al crear el procesador de fuente de cambios, pero úselas cuidadosamente:

  • Adquisición de concesiones: de forma predeterminada, cada 17 segundos. Un host comprueba periódicamente el estado del almacén de concesiones y considera la posibilidad de adquirir concesiones como parte del proceso de escalado dinámico. Este proceso se realiza ejecutando una consulta en el contenedor de concesiones. Reducir este valor hará que el reequilibrio y la adquisición de concesiones sea más rápido, pero aumente el consumo de unidades de solicitud en el contenedor de concesiones.
  • Expiración de concesiones: de forma predeterminada, 60 segundos. Define la cantidad máxima de tiempo que podrá existir una concesión sin ninguna actividad de renovación antes de ser adquirida por otro host. Cuando un host se bloquee, otros hosts recogerán las concesiones que posea después de este período de tiempo más el intervalo de renovación configurado. Reducir este valor hará que la recuperación después de que un host se bloquee sea más rápida, pero el valor de expiración nunca debería ser inferior al intervalo de renovación.
  • Renovación de concesiones: de forma predeterminada, cada 13 segundos. Un host que sea propietario de una concesión la renovará periódicamente incluso aunque no hubiera nuevos cambios que consumir. Este proceso se realiza mediante la ejecución de un reemplazo en la concesión. Al reducir este valor, se reducirá el tiempo necesario para detectar concesiones perdidas por bloqueo de host, pero se aumentará el consumo de unidades de solicitud en el contenedor de concesiones.

Dónde hospedar el procesador de fuente de cambios

El procesador de fuente de cambios se puede hospedar en cualquier plataforma que admita procesos o tareas de larga duración. Estos son algunos ejemplos:

Aunque el procesador de fuente de cambios puede ejecutarse en entornos de corta duración, dado que el contenedor de concesión mantiene el estado, el ciclo de inicio de estos entornos retrasará el tiempo de recepción de las notificaciones (debido a la sobrecarga que supone iniciar el procesador cada vez que se inicie el entorno).

Requisitos de acceso basado en roles

Al usar Microsoft Entra ID como mecanismo de autenticación, asegúrese de que la identidad tiene los permisos adecuados:

  • En el contenedor supervisado:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • En el contenedor de concesión:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

Recursos adicionales

Pasos siguientes

Obtenga más información sobre el procesador de la fuente de cambios en los siguientes artículos: