Padrões de tarefas de replicação de eventos

A visão geral da federação e das funções do replicador explicam a lógica para os elementos básicos das tarefas de replicação. Recomenda-se que você conheça essas visões gerais antes de prosseguir neste artigo.

Neste artigo detalhamos as diretrizes de implementação para vários padrões destacados na seção visão geral.

Replicação

O padrão de Replicação copia eventos de um Hub de Eventos para o próximo, ou de um Hub de Eventos para outro destino, como uma fila de Barramento de Serviço. Os eventos são encaminhados sem fazer modificações no conteúdo do evento.

A implementação desse padrão é coberta pela replicação de Eventos entre os Hubs de Eventos e a replicação de Eventos entre os exemplos de Hubs de Eventos e de Barramento de Serviço, e a Utilização do Apache Kafka MirrorMaker com o tutorial do Hubs de Eventos para o caso específico de replicar dados de um agente Apache Kafka para dentro dos Hubs de Eventos.

Fluxos e preservação da ordem

A replicação, seja por meio do Azure Functions ou do Azure Stream Analytics, não visa garantir a criação de clones exatos individuais de um Hub de Eventos de origem para um Hub de Eventos de destino, mas se concentra na preservação da ordem relativa dos eventos onde o aplicativo exige isso. O aplicativo comunica isso agrupando eventos relacionados com a mesma chave de partição e os Hubs de Eventos organizam as mensagens com a mesma chave de partição sequencialmente na mesma partição.

Importante

As informações de "deslocamento" são exclusivas para cada Hub de Eventos, e os deslocamentos para os mesmos eventos são diferentes nas instâncias do Hub de Eventos. Para localizar uma posição em um fluxo de eventos copiado, use deslocamentos baseados no tempo e consulte os metadados propagados atribuídos a serviços.

Os deslocamentos com base no tempo iniciam o receptor em um ponto específico no tempo:

  • EventPosition. FromStart () - lê novamente todos os dados retidos.
  • EventPosition.FromEnd() - lê todos os novos dados a partir da hora da conexão.
  • EventPosition.FromEnqueuedTime(dateTime) - lê todos os dados a partir de uma determinada data e hora.

No EventProcessor, você define a posição por meio do InitialOffsetProvider no EventProcessorOptions. Com as outras APIs do destinatário, a posição é passada pelo construtor.

Os auxiliares da função de replicação pré-criados fornecidos como exemplos, que são usados nas diretrizes baseadas no Azure Functions, garantem que os fluxos de eventos com a mesma chave de partição recuperada de uma partição de origem sejam enviados para o Hub de Eventos de destino como um lote no fluxo original, e com a mesma chave de partição.

Se a contagem de partições do Hub de Eventos de origem e de destino for idêntica, todos os fluxos no destino são mapeados para as mesmas partições da origem. Se a contagem de partições for diferente, que é importante para alguns dos padrões descritos a seguir, o mapeamento será diferente, mas os fluxos são sempre mantidos juntos e em ordem.

A ordem relativa dos eventos que pertencem a fluxos diferentes, ou de eventos independentes sem uma chave de partição em uma partição de destino, pode ser sempre diferente da partição de origem.

Metadados atribuídos ao serviço

Os metadados atribuídos ao serviço de um evento obtido do Hub de Eventos de origem, a hora de enfileiramento original, o número de sequência e o deslocamento, são substituídos por novos valores atribuídos ao serviço no Hub de Eventos de destino, mas com as funções auxiliares (tarefas de replicação fornecidas em nossos exemplos), os valores originais são preservados nas propriedades do usuário: repl-enqueue-time (cadeia de caracteres ISO8601), repl-sequence, repl-offset.

Essas propriedades são do tipo string (cadeia de caracteres) e contêm o valor em cadeias de caracteres das respectivas propriedades originais. Se o evento for encaminhado várias vezes, os metadados atribuídos ao serviço da origem imediata serão anexados às propriedades já existentes, com valores separados por ponto e vírgula.

Failover

Se você estiver usando a replicação com o objetivo de usar na recuperação de desastres, para se proteger contra eventos de disponibilidade regional no serviço de Hubs de Eventos ou contra interrupções de rede, qualquer cenário de falha desse tipo exigirá a execução de um failover de um Hub de Eventos para o próximo hub, informando produtores e/ou consumidores para usarem o ponto de extremidade secundário.

Para todos os cenários de failover, supõe-se que os elementos necessários dos namespaces sejam estruturalmente idênticos, o que significa que os Hubs de Eventos e os Grupos de Consumidores são nomeados de modo idêntico, e que as regras de assinatura de acesso compartilhado e/ou as regras de controle de acesso baseado em função estão configuradas da mesma maneira. Para criar (e atualizar) um namespace secundário, siga as diretrizes para mover namespaces e omita a etapa de limpeza.

Para forçar produtores e consumidores a mudarem, você precisa disponibilizar as informações sobre qual namespace usar, em um local que seja fácil de pesquisar, acessar e atualizar. Se produtores ou consumidores encontrarem erros frequentes ou persistentes, eles devem consultar esse local e ajustar sua configuração. Há várias maneiras de compartilhar essa configuração, mas indicamos dois: DNS e compartilhamentos de arquivos.

Configuração de failover com base em DNS

Uma abordagem possível é manter as informações em Registros SRV de DNS em um DNS que você controla, e apontar para os respectivos pontos de extremidade do Hub de Eventos.

Importante

Lembre-se de que os Hubs de Eventos não permitem que seus pontos de extremidade tenham um alias direto com registros CNAME, o que significa que você usa o DNS como um mecanismo de pesquisa resiliente para endereços de ponto de extremidade e não para resolver diretamente as informações de endereço IP.

Suponha que você tenha o domínio example.com e, para seu aplicativo, uma zona test.example.com. Para dois Hubs de Eventos alternativos, você cria duas zonas aninhadas adicionais e um Registro SRV em cada um.

Os Registros SRV são, seguindo a convenção comum, prefixados com _azure_eventhubs._amqp e retêm dois registros de ponto de extremidade: um para AMQP-over-TLS na porta 5671 e outro para AMQP-over-WebSockets na porta 443, ambos apontando para o ponto de extremidade dos Hubs de Eventos do namespace correspondente à zona.

Zona Registros SRV
eh1.test.example.com _azure_servicebus._amqp.eh1.test.example.com
1 1 5671 eh1-test-example-com.servicebus.windows.net
2 2 443 eh1-test-example-com.servicebus.windows.net
eh2.test.example.com _azure_servicebus._amqp.eh2.test.example.com
1 1 5671 eh2-test-example-com.servicebus.windows.net
2 2 443 eh2-test-example-com.servicebus.windows.net

Na zona do aplicativo, crie uma entrada CNAME que aponta para a zona subordinada correspondente ao seu Hub de Eventos primário:

Registro CNAME Alias
eventhub.test.example.com eh1.test.example.com

Usando um cliente DNS que permite a consulta de registros CNAME e SRV de forma explícita (os clientes internos do Java e do .NET só permitem a resolução simples de nomes para endereços IP), você pode resolver o ponto de extremidade desejado. Com DnsClient.net, por exemplo, a função pesquisa é:

static string GetEventHubName(string aliasName)
{
    const string SrvRecordPrefix = "_azure_eventhub._amqp.";
    LookupClient lookup = new LookupClient();

    return (from CNameRecord alias in (lookup.Query(aliasName, QueryType.CNAME).Answers)
            from SrvRecord srv in lookup.Query(SrvRecordPrefix + alias.CanonicalName, QueryType.SRV).Answers
            where srv.Port == 5671
            select srv.Target).FirstOrDefault()?.Value.TrimEnd('.');
}

A função retorna o nome do host de destino registrado para a porta 5671 da zona que é o alias atual do CNAME, conforme mostrado acima.

A execução do failover requer que o registro CNAME seja alterado, apontando-o para a zona alternativa.

A vantagem de usar o DNS e, especificamente, o DNS do Azure, é que as informações do DNS do Azure são replicadas globalmente e, portanto, são resilientes contra interrupções de uma única região.

Esse procedimento é semelhante ao funcionamento do Geo-Dr dos Hubs de Eventos, mas você tem o controle total. Ele também funciona com cenários ativos/inativos.

Configuração de failover com base em compartilhamento de arquivo

A alternativa mais simples ao uso de DNS para compartilhar informações de ponto de extremidade, é colocar o nome do ponto de extremidade primário em um arquivo de texto sem formatação e disponibilizar o arquivo de uma infraestrutura robusta contra interrupções que permita atualizações.

Se você já tiver uma infraestrutura de site com alta disponibilidade, com disponibilidade global e replicação de conteúdo, adicione e republique o arquivo se uma troca for necessária.

Cuidado

Você só deve publicar o nome do ponto de extremidade dessa forma, não use uma cadeia de conexão completa, incluindo senhas.

Considerações adicionais sobre o failover de consumidores

Para os consumidores do Hub de Eventos, dependendo das necessidades do processador de eventos, serão necessárias.outras considerações sobre a estratégia de failover.

Se houver um desastre que exija a recriação de um sistema (inclusive de bancos de dados) a partir dos dados de backup, e os bancos de dado são alimentados diretamente ou por meio de processamento intermediário dos eventos mantidos no Hub de Eventos, você precisa restaurar o backup e, em seguida, comece a reproduzir os eventos no sistema a partir do momento em que o backup do banco de dados foi criado e não do momento em que o sistema original foi destruído.

Se a falha afeta apenas uma parte do sistema ou apenas um único Hub de Eventos, que se tornou inacessível, você vai querer continuar o processamento dos eventos a partir da mesma posição em que o processamento foi interrompido.

Para executar qualquer cenário e usar o processador de eventos do seu respectivo SDK do Azure, crie um novo armazenamento de ponto de verificação e providencie uma posição de partição inicial com base no carimbo de data/hora a partir do qual você deseja retomar o processamento.

Se você ainda tiver acesso ao armazenamento de ponto de verificação do Hub de Eventos do qual está mudando, os metadados propagados discutidos acima ajudarão você a pular os eventos que já foram tratados e retomar precisamente do ponto onde tudo parou.

Mesclar

O padrão de mesclagem tem uma ou mais tarefas de replicação que apontam para um destino, possivelmente ao mesmo tempo com produtores regulares, que também enviam eventos para o mesmo destino.

As variações desses padrões são:

  • Duas ou mais funções de replicação obtendo simultaneamente eventos de origens separadas e enviando-os para o mesmo destino.
  • Mais uma função de replicação obtendo eventos de uma fonte enquanto o destino também é usado diretamente por produtores.
  • O padrão anterior, mas espelhado entre dois ou mais Hubs de Eventos, ou seja, ambos os hubs contendo os mesmos fluxos, independentemente de onde os eventos são produzidos.

As duas primeiras variações de padrão são triviais e não diferem das tarefas de replicação simples.

O último cenário requer a exclusão de eventos já replicados para que não sejam replicados novamente. A técnica é demonstrada e explicada no exemplo EventHubToEventHubMerge.

Editor

O padrão do editor se baseia no padrão de replicação, mas as mensagens são modificadas antes de serem encaminhadas.

Exemplos destas modificações:

  • Transcodificação – se o conteúdo do evento (também conhecido como "corpo" ou "conteúdo") chega da origem codificada usando o formato Apache Avro ou outro formato de serialização proprietário, mas a expectativa do sistema destino é que o conteúdo seja codificado em JSON, uma tarefa de replicação de transcodificação primeiro desserializa o conteúdo do Apache Avro em um grafo de objeto na memória interna e, em seguida, serializa esse grafo no formato JSON para o evento que está sendo encaminhado. A transcodificação também inclui as tarefas de compactação e de descompactação do conteúdo.
  • Transformação - os eventos que contêm dados estruturados podem exigir a reformatação desses dados para facilitar o processamento por consumidores de downstream. Este escopo pode envolver trabalhos como por exemplo, nivelar estruturas aninhadas, remover elementos de dados estranhos ou reformatar o conteúdo para ajustá-lo exatamente a um determinado esquema.
  • Envio em lote - os eventos podem ser recebidos em lotes (vários eventos em uma única transferência) de uma origem, mas precisam ser encaminhados individualmente para um destino, ou vice-versa. Portanto, uma tarefa pode encaminhar vários eventos com base em uma única transferência de evento de entrada ou agregar um conjunto de eventos que são transferidos juntos.
  • Validação – os dados de evento de origens externas geralmente precisam ser verificados se estão em conformidade com um conjunto de regras, antes que possam ser encaminhados. As regras podem ser expressas usando esquemas ou código. Eventos que não estão em conformidade podem ser removidos, com o problema registrado em logs, ou podem ser encaminhados a um destino especial para que sejam posteriormente tratadas.
  • Enriquecimento - os dados de eventos provenientes de algumas origens podem exigir enriquecimento com contexto adicional para que sejam utilizáveis nos sistemas de destino. Esse enriquecimento pode envolver a pesquisa de dados de referência e a incorporação desses dados com o evento, ou a adição de informações sobre a origem que é conhecida pela tarefa de replicação, mas não estão contidas nos eventos.
  • Filtragem - alguns eventos que chegam de uma origem podem precisar ser retidos com base em alguma regra. Um filtro testa o evento em relação a uma regra e remove o evento se não corresponder à regra. Uma forma de filtragem é remover eventos duplicados observando determinados critérios e descartar eventos subsequentes com os mesmos valores.
  • Criptografia – uma tarefa de replicação pode precisar descriptografar o conteúdo que chega da origem e/ou criptografar o conteúdo encaminhado a um destino e/ou pode precisar verificar a integridade do conteúdo e dos metadados em relação a uma assinatura executada no evento, ou anexar a assinatura.
  • Atestado - uma tarefa de replicação pode anexar metadados, possivelmente protegidos por uma assinatura digital, a um evento que atesta que o evento foi recebido por meio de um canal específico ou em um momento específico.
  • Encadeamento - uma tarefa de replicação pode aplicar assinaturas a fluxos de eventos, de modo que a integridade do fluxo seja protegida e os eventos ausentes possam ser detectados.

Os padrões de transformação, processamento em lote e enriquecimento geralmente são melhor implementados com trabalhos do Azure Stream Analytics.

Todos esses padrões podem ser implementados usando o Azure Functions, com o gatilho do Hubs de Eventos para obter os eventos e a Associação de saída do Hub de Eventos para entregá-los.

Roteamento

O padrão de roteamento baseia-se no padrão de replicação, mas em vez de ter uma origem e um destino, a tarefa de replicação tem vários destinos, ilustrados aqui em C#:

[FunctionName("EH2EH")]
public static async Task Run(
    [EventHubTrigger("source", Connection = "EventHubConnectionAppSetting")] EventData[] events,
    [EventHub("dest1", Connection = "EventHubConnectionAppSetting")] EventHubClient output1,
    [EventHub("dest2", Connection = "EventHubConnectionAppSetting")] EventHubClient output2,
    ILogger log)
{
    foreach (EventData eventData in events)
    {
        // send to output1 and/or output2 based on criteria
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output1, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2==0 ) ? inputEvent : null;
        });
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output2, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2!=0 ) ? inputEvent : null;
        });
    }
}

A função de roteamento considera os metadados da mensagem e/ou o conteúdo da mensagem e, em seguida, seleciona um dos destinos disponíveis para enviar.

No Azure Stream Analytics você pode conseguir o mesmo usando a definição de várias saídas e, em seguida, executando uma consulta por saída.

select * into dest1Output from inputSource where Info = 1
select * into dest2Output from inputSource where Info = 2

Projeção de log

O padrão de projeção de log mescla o fluxo de eventos em um banco de dados indexado com eventos que se tornam registros no banco de dados. Normalmente os eventos são adicionados à mesma coleção ou tabela, e a chave de partição do Hub de Eventos se torna parte da chave primária, tentando fazer com que o registro se torne exclusivo.

A projeção de log pode produzir uma série temporal histórica de seus dados de evento ou uma exibição compactada, na qual apenas o evento mais recente é retido para cada chave de partição. A forma do banco de dados de destino depende de você e das necessidades do seu aplicativo. Este padrão também é conhecido como "fonte do evento".

Dica

Você pode criar projeções de log de forma fácil no Banco de Dados SQL do Azure e no Azure Cosmos DB em Azure Stream Analytics, sua melhor opção.

O próximo Azure Functions projeta o conteúdo compactado de um Hub de Eventos em uma coleção do Azure Cosmos DB.

[FunctionName("Eh1ToCosmosDb1Json")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static async Task Eh1ToCosmosDb1Json(
    [EventHubTrigger("eh1", ConsumerGroup = "Eh1ToCosmosDb1", Connection = "Eh1ToCosmosDb1-source-connection")] EventData[] input,
    [CosmosDB(databaseName: "SampleDb", collectionName: "foo", ConnectionStringSetting = "CosmosDBConnection")] IAsyncCollector<object> output,
    ILogger log)
{
    foreach (var ev in input)
    {
        if (!string.IsNullOrEmpty(ev.SystemProperties.PartitionKey))
        {
            var record = new
            {
                id = ev.SystemProperties.PartitionKey,
                data = JsonDocument.Parse(ev.Body),
                properties = ev.Properties
            };
            await output.AddAsync(record);
        }
    }
}

Próximas etapas