Introdução à captura de dados de alteração no repositório analítico do Azure Cosmos DB

APLICA-SE A: NoSQL MongoDB

Use a captura de dados de alteração (CDC) no repositório analítico do Azure Cosmos DB como uma fonte para o Azure Data Factory ou o Azure Synapse Analytics para capturar alterações específicas em seus dados.

Nota

Observe que a interface de serviço vinculada para o Azure Cosmos DB para a API do MongoDB ainda não está disponível no Dataflow. No entanto, você poderá usar o ponto de extremidade de documento da sua conta com a interface de serviço vinculado "Azure Cosmos DB para NoSQL" como uma solução alternativa até que o serviço vinculado Mongo seja suportado. Em um serviço vinculado NoSQL, escolha "Enter Manualmente" para fornecer as informações da conta do Cosmos DB e use o ponto de extremidade do documento da conta (por exemplo: https://[your-database-account-uri].documents.azure.com:443/) em vez do ponto de extremidade do MongoDB (por exemplo: mongodb://[your-database-account-uri].mongo.cosmos.azure.com:10255/)

Pré-requisitos

Habilitar armazenamento analítico

Primeiro, habilite o Azure Synapse Link no nível da conta e, em seguida, habilite o armazenamento analítico para os contêineres apropriados para sua carga de trabalho.

  1. Habilitar o Azure Synapse Link: Habilitar o Azure Synapse Link para uma conta do Azure Cosmos DB

  2. Habilite o armazenamento analítico para seus contêineres:

    Opção Guia
    Habilitar para um novo contêiner específico Habilite o Azure Synapse Link para seus novos contêineres
    Habilitar para um contêiner existente específico Habilite o Azure Synapse Link para seus contêineres existentes

Criar um recurso do Azure de destino usando fluxos de dados

O recurso de captura de dados de alteração do repositório analítico está disponível por meio do recurso de fluxo de dados do Azure Data Factory ou do Azure Synapse Analytics. Para este guia, use o Azure Data Factory.

Importante

Como alternativa, você pode usar o Azure Synapse Analytics. Primeiro, crie um espaço de trabalho do Azure Synapse, se ainda não tiver um. No espaço de trabalho recém-criado, selecione a guia Desenvolver, selecione Adicionar novo recurso e selecione Fluxo de dados.

  1. Crie um Azure Data Factory, se ainda não tiver um.

    Gorjeta

    Se possível, crie o data factory na mesma região onde sua conta do Azure Cosmos DB reside.

  2. Inicie o data factory recém-criado.

  3. No data factory, selecione a guia Fluxos de dados e, em seguida, selecione Novo fluxo de dados.

  4. Dê ao fluxo de dados recém-criado um nome exclusivo. Neste exemplo, o fluxo de dados é chamado cosmoscdc.

    Screnshot de um novo fluxo de dados com o nome cosmoscdc.

Definir configurações de origem para o contêiner de armazenamento analítico

Agora, crie e configure uma fonte para fluir dados do repositório analítico da conta do Azure Cosmos DB.

  1. Selecione Adicionar Origem.

    Captura de tela da opção de menu adicionar fonte.

  2. No campo Nome do fluxo de saída, insira cosmos.

    Captura de tela do nome do cosmos de origem recém-criado.

  3. Na seção Tipo de origem, selecione Inline.

    Captura de tela mostrando a seleção do tipo de fonte embutida.

  4. No campo Conjunto de Dados, selecione Azure - Azure Cosmos DB para NoSQL.

    Captura de tela da seleção do Azure Cosmos DB para NoSQL como o tipo de conjunto de dados.

  5. Crie um novo serviço vinculado para sua conta chamado cosmoslinkedservice. Selecione sua conta existente do Azure Cosmos DB para NoSQL na caixa de diálogo pop-up Novo serviço vinculado e selecione Ok. Neste exemplo, selecionamos uma conta pré-existente do Azure Cosmos DB para NoSQL nomeada msdocs-cosmos-source e um banco de dados chamado cosmicworks.

    Captura de tela da caixa de diálogo Novo serviço vinculado com uma conta do Azure Cosmos DB selecionada.

  6. Selecione Analítico para o tipo de loja.

    Captura de tela da opção analítica selecionada para um serviço vinculado.

  7. Selecione a guia Opções de origem .

  8. Em Opções de origem, selecione seu contêiner de destino e habilite a depuração do fluxo de dados. Neste exemplo, o contêiner é chamado products.

    Captura de tela de um contêiner de origem selecionado como produtos nomeados.

  9. Selecione Depuração de fluxo de dados. Na caixa de diálogo pop-up Ativar depuração de fluxo de dados, mantenha as opções padrão e selecione Ok.

    Captura de tela da opção de alternância para habilitar a depuração do fluxo de dados.

  10. A guia Opções de origem também contém outras opções que você pode querer ativar. Esta tabela descreve essas opções:

Opção Description
Capturar atualizações intermediárias Habilite essa opção se quiser capturar o histórico de alterações em itens, incluindo as alterações intermediárias entre leituras de captura de dados de alteração.
Exclusões de captura Habilite essa opção para capturar registros excluídos pelo usuário e aplicá-los no coletor. As exclusões não podem ser aplicadas no Azure Data Explorer e nos coletores do Azure Cosmos DB.
Capturar TTLs de armazenamento transacional Habilite essa opção para capturar registros excluídos de TTL do repositório transacional do Azure Cosmos DB (tempo de vida) e aplique no Coletor. As exclusões TTL não podem ser aplicadas no Azure Data Explorer e nos coletores do Azure Cosmos DB.
Tamanho do lote em bytes Essa configuração é, na verdade , gigabytes. Especifique o tamanho em gigabytes se quiser agrupar os feeds de captura de dados de alteração
Configurações extras Configurações extras do repositório analítico do Azure Cosmos DB e seus valores. (ex: spark.cosmos.allowWhiteSpaceInFieldNames -> true)

Trabalhando com opções de origem

Quando você verificar qualquer uma Capture intermediate updatesdas opções , Capture Deltese , seu Capture Transactional store TTLs processo CDC criará e preencherá o __usr_opType campo no coletor com os seguintes valores:

valor Description Opção
1 ATUALIZAR Atualizações intermediárias do Capture
2 INSERT Não há uma opção para inserções, ela está ativada por padrão
3 USER_DELETE Exclusões de captura
4 TTL_DELETE Capturar TTLs de armazenamento transacional

Se você tiver que diferenciar os registros excluídos TTL de documentos excluídos por usuários ou aplicativos, você tem verificar ambos e Capture intermediate updates Capture Transactional store TTLs opções. Então você tem que adaptar seus processos CDC ou aplicativos ou consultas para usar __usr_opType de acordo com o que é necessário para as necessidades do seu negócio.

Gorjeta

Se houver necessidade de os consumidores a jusante restaurarem a ordem das atualizações com a opção "capturar atualizações intermediárias" marcada, o campo de carimbo de data/hora _ts do sistema pode ser usado como campo de pedido.

Criar e definir configurações de coletor para operações de atualização e exclusão

Primeiro, crie um coletor de Armazenamento de Blob do Azure simples e, em seguida, configure o coletor para filtrar dados para apenas operações específicas.

  1. Crie uma conta e um contêiner do Armazenamento de Blobs do Azure, se ainda não tiver um. Para os próximos exemplos, usaremos uma conta chamada msdocsblobstorage e um contêiner chamado output.

    Gorjeta

    Se possível, crie a conta de armazenamento na mesma região onde sua conta do Azure Cosmos DB reside.

  2. De volta ao Azure Data Factory, crie um novo coletor para os dados de alteração capturados de sua cosmos origem.

    Captura de tela mostrando a adição de um novo coletor conectado à fonte existente.

  3. Dê um nome exclusivo à pia. Neste exemplo, a pia é chamada storage.

    Captura de tela mostrando como nomear o armazenamento de coletor recém-criado.

  4. Na seção Tipo de coletor, selecione Inline. No campo Conjunto de dados, selecione Delta.

    Captura de tela da seleção e do tipo de conjunto de dados Delta embutido para o coletor.

  5. Crie um novo serviço vinculado para sua conta usando o Armazenamento de Blobs do Azure chamado storagelinkedservice. Selecione sua conta existente do Armazenamento de Blobs do Azure na caixa de diálogo pop-up Novo serviço vinculado e selecione Ok. Neste exemplo, selecionamos uma conta de Armazenamento de Blob do Azure pré-existente chamada msdocsblobstorage.

    Captura de tela das opções de tipo de serviço para um novo serviço vinculado Delta.

    Captura de tela da caixa de diálogo Novo serviço vinculado com uma conta de Armazenamento de Blob do Azure selecionada.

  6. Selecione o separador Definições.

  7. Em Configurações, defina o caminho da pasta como o nome do contêiner de blob. Neste exemplo, o nome do contêiner é output.

    Captura de tela do contêiner de blob chamado output set como o destino do coletor.

  8. Localize a seção Método de atualização e altere as seleções para permitir apenas operações de exclusão e atualização . Além disso, especifique as colunas de chave como uma lista de colunas usando o campo {_rid} como identificador exclusivo.

    Captura de tela dos métodos de atualização e colunas de chave que estão sendo especificados para o coletor.

  9. Selecione Validar para garantir que não cometeu erros ou omissões. Em seguida, selecione Publicar para publicar o fluxo de dados.

    Captura de ecrã da opção para validar e, em seguida, publicar o fluxo de dados atual.

Programar a execução da captura de dados de alteração

Depois que um fluxo de dados for publicado, você poderá adicionar um novo pipeline para mover e transformar seus dados.

  1. Criar um novo pipeline. Dê ao pipeline um nome exclusivo. Neste exemplo, o pipeline é chamado cosmoscdcpipeline.

    Captura de tela da nova opção de pipeline na seção de recursos.

  2. Na seção Atividades, expanda a opção Mover & transformar e selecione Fluxo de dados.

    Captura de tela da opção de atividade de fluxo de dados na seção de atividades.

  3. Dê um nome exclusivo à atividade de fluxo de dados. Neste exemplo, a atividade é denominada cosmoscdcactivity.

  4. Na guia Configurações, selecione o fluxo de dados nomeado que cosmoscdc você criou anteriormente neste guia. Em seguida, selecione um tamanho de computação com base no volume de dados e na latência necessária para sua carga de trabalho.

    Captura de tela das definições de configuração para o fluxo de dados e o tamanho de computação para a atividade.

    Gorjeta

    Para tamanhos de dados incrementais superiores a 100 GB, recomendamos o tamanho personalizado com uma contagem de núcleos de 32 (+16 núcleos de driver).

  5. Selecione Adicionar gatilho. Agende esse pipeline para ser executado em uma cadência que faça sentido para sua carga de trabalho. Neste exemplo, o pipeline é configurado para ser executado a cada cinco minutos.

    Captura de tela do botão adicionar gatilho para um novo pipeline.

    Captura de tela de uma configuração de gatilho com base em um cronograma, a partir do ano de 2023, que é executado a cada cinco minutos.

    Nota

    A janela mínima de recorrência para execuções de captura de dados de alteração é de um minuto.

  6. Selecione Validar para garantir que não cometeu erros ou omissões. Em seguida, selecione Publicar para publicar o pipeline.

  7. Observe os dados colocados no contêiner de Armazenamento de Blobs do Azure como uma saída do fluxo de dados usando a captura de dados de alteração do repositório analítico do Azure Cosmos DB.

    Screnshot dos arquivos de saída do pipeline no contêiner de Armazenamento de Blob do Azure.

    Nota

    O tempo inicial de inicialização do cluster pode levar até três minutos. Para evitar o tempo de inicialização do cluster nas execuções subsequentes de captura de dados de alteração, configure o valor Tempo de vida do cluster de fluxo de dados. Para obter mais informações sobre o tempo de execução de iteração e TTL, consulte Tempo de execução de integração no Azure Data Factory.

Trabalhos simultâneos

O tamanho do lote nas opções de origem, ou situações em que o coletor é lento para ingerir o fluxo de alterações, pode causar a execução de vários trabalhos ao mesmo tempo. Para evitar essa situação, defina a opção Simultaneidade como 1 nas configurações de Pipeline, para garantir que novas execuções não sejam acionadas até que a execução atual seja concluída.

Próximos passos