Introdução à captura de dados de alteração no repositório analítico do Azure Cosmos DB
APLICA-SE A: NoSQL MongoDB
Use a captura de dados de alteração (CDC) no repositório analítico do Azure Cosmos DB como uma fonte para o Azure Data Factory ou o Azure Synapse Analytics para capturar alterações específicas em seus dados.
Nota
Observe que a interface de serviço vinculada para o Azure Cosmos DB para a API do MongoDB ainda não está disponível no Dataflow. No entanto, você poderá usar o ponto de extremidade de documento da sua conta com a interface de serviço vinculado "Azure Cosmos DB para NoSQL" como uma solução alternativa até que o serviço vinculado Mongo seja suportado. Em um serviço vinculado NoSQL, escolha "Enter Manualmente" para fornecer as informações da conta do Cosmos DB e use o ponto de extremidade do documento da conta (por exemplo: https://[your-database-account-uri].documents.azure.com:443/
) em vez do ponto de extremidade do MongoDB (por exemplo: mongodb://[your-database-account-uri].mongo.cosmos.azure.com:10255/
)
Pré-requisitos
- Uma conta existente do Azure Cosmos DB.
- Se você tiver uma assinatura do Azure, crie uma nova conta.
- Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.
- Como alternativa, você pode experimentar o Azure Cosmos DB gratuitamente antes de confirmar.
Habilitar armazenamento analítico
Primeiro, habilite o Azure Synapse Link no nível da conta e, em seguida, habilite o armazenamento analítico para os contêineres apropriados para sua carga de trabalho.
Habilitar o Azure Synapse Link: Habilitar o Azure Synapse Link para uma conta do Azure Cosmos DB
Habilite o armazenamento analítico para seus contêineres:
Opção Guia Habilitar para um novo contêiner específico Habilite o Azure Synapse Link para seus novos contêineres Habilitar para um contêiner existente específico Habilite o Azure Synapse Link para seus contêineres existentes
Criar um recurso do Azure de destino usando fluxos de dados
O recurso de captura de dados de alteração do repositório analítico está disponível por meio do recurso de fluxo de dados do Azure Data Factory ou do Azure Synapse Analytics. Para este guia, use o Azure Data Factory.
Importante
Como alternativa, você pode usar o Azure Synapse Analytics. Primeiro, crie um espaço de trabalho do Azure Synapse, se ainda não tiver um. No espaço de trabalho recém-criado, selecione a guia Desenvolver, selecione Adicionar novo recurso e selecione Fluxo de dados.
Crie um Azure Data Factory, se ainda não tiver um.
Gorjeta
Se possível, crie o data factory na mesma região onde sua conta do Azure Cosmos DB reside.
Inicie o data factory recém-criado.
No data factory, selecione a guia Fluxos de dados e, em seguida, selecione Novo fluxo de dados.
Dê ao fluxo de dados recém-criado um nome exclusivo. Neste exemplo, o fluxo de dados é chamado
cosmoscdc
.
Definir configurações de origem para o contêiner de armazenamento analítico
Agora, crie e configure uma fonte para fluir dados do repositório analítico da conta do Azure Cosmos DB.
Selecione Adicionar Origem.
No campo Nome do fluxo de saída, insira cosmos.
Na seção Tipo de origem, selecione Inline.
No campo Conjunto de Dados, selecione Azure - Azure Cosmos DB para NoSQL.
Crie um novo serviço vinculado para sua conta chamado cosmoslinkedservice. Selecione sua conta existente do Azure Cosmos DB para NoSQL na caixa de diálogo pop-up Novo serviço vinculado e selecione Ok. Neste exemplo, selecionamos uma conta pré-existente do Azure Cosmos DB para NoSQL nomeada
msdocs-cosmos-source
e um banco de dados chamadocosmicworks
.Selecione Analítico para o tipo de loja.
Selecione a guia Opções de origem .
Em Opções de origem, selecione seu contêiner de destino e habilite a depuração do fluxo de dados. Neste exemplo, o contêiner é chamado
products
.Selecione Depuração de fluxo de dados. Na caixa de diálogo pop-up Ativar depuração de fluxo de dados, mantenha as opções padrão e selecione Ok.
A guia Opções de origem também contém outras opções que você pode querer ativar. Esta tabela descreve essas opções:
Opção | Description |
---|---|
Capturar atualizações intermediárias | Habilite essa opção se quiser capturar o histórico de alterações em itens, incluindo as alterações intermediárias entre leituras de captura de dados de alteração. |
Exclusões de captura | Habilite essa opção para capturar registros excluídos pelo usuário e aplicá-los no coletor. As exclusões não podem ser aplicadas no Azure Data Explorer e nos coletores do Azure Cosmos DB. |
Capturar TTLs de armazenamento transacional | Habilite essa opção para capturar registros excluídos de TTL do repositório transacional do Azure Cosmos DB (tempo de vida) e aplique no Coletor. As exclusões TTL não podem ser aplicadas no Azure Data Explorer e nos coletores do Azure Cosmos DB. |
Tamanho do lote em bytes | Essa configuração é, na verdade , gigabytes. Especifique o tamanho em gigabytes se quiser agrupar os feeds de captura de dados de alteração |
Configurações extras | Configurações extras do repositório analítico do Azure Cosmos DB e seus valores. (ex: spark.cosmos.allowWhiteSpaceInFieldNames -> true ) |
Trabalhando com opções de origem
Quando você verificar qualquer uma Capture intermediate updates
das opções , Capture Deltes
e , seu Capture Transactional store TTLs
processo CDC criará e preencherá o __usr_opType
campo no coletor com os seguintes valores:
valor | Description | Opção |
---|---|---|
1 | ATUALIZAR | Atualizações intermediárias do Capture |
2 | INSERT | Não há uma opção para inserções, ela está ativada por padrão |
3 | USER_DELETE | Exclusões de captura |
4 | TTL_DELETE | Capturar TTLs de armazenamento transacional |
Se você tiver que diferenciar os registros excluídos TTL de documentos excluídos por usuários ou aplicativos, você tem verificar ambos e Capture intermediate updates
Capture Transactional store TTLs
opções. Então você tem que adaptar seus processos CDC ou aplicativos ou consultas para usar __usr_opType
de acordo com o que é necessário para as necessidades do seu negócio.
Gorjeta
Se houver necessidade de os consumidores a jusante restaurarem a ordem das atualizações com a opção "capturar atualizações intermediárias" marcada, o campo de carimbo de data/hora _ts
do sistema pode ser usado como campo de pedido.
Criar e definir configurações de coletor para operações de atualização e exclusão
Primeiro, crie um coletor de Armazenamento de Blob do Azure simples e, em seguida, configure o coletor para filtrar dados para apenas operações específicas.
Crie uma conta e um contêiner do Armazenamento de Blobs do Azure, se ainda não tiver um. Para os próximos exemplos, usaremos uma conta chamada
msdocsblobstorage
e um contêiner chamadooutput
.Gorjeta
Se possível, crie a conta de armazenamento na mesma região onde sua conta do Azure Cosmos DB reside.
De volta ao Azure Data Factory, crie um novo coletor para os dados de alteração capturados de sua
cosmos
origem.Dê um nome exclusivo à pia. Neste exemplo, a pia é chamada
storage
.Na seção Tipo de coletor, selecione Inline. No campo Conjunto de dados, selecione Delta.
Crie um novo serviço vinculado para sua conta usando o Armazenamento de Blobs do Azure chamado storagelinkedservice. Selecione sua conta existente do Armazenamento de Blobs do Azure na caixa de diálogo pop-up Novo serviço vinculado e selecione Ok. Neste exemplo, selecionamos uma conta de Armazenamento de Blob do Azure pré-existente chamada
msdocsblobstorage
.Selecione o separador Definições.
Em Configurações, defina o caminho da pasta como o nome do contêiner de blob. Neste exemplo, o nome do contêiner é
output
.Localize a seção Método de atualização e altere as seleções para permitir apenas operações de exclusão e atualização . Além disso, especifique as colunas de chave como uma lista de colunas usando o campo
{_rid}
como identificador exclusivo.Selecione Validar para garantir que não cometeu erros ou omissões. Em seguida, selecione Publicar para publicar o fluxo de dados.
Programar a execução da captura de dados de alteração
Depois que um fluxo de dados for publicado, você poderá adicionar um novo pipeline para mover e transformar seus dados.
Criar um novo pipeline. Dê ao pipeline um nome exclusivo. Neste exemplo, o pipeline é chamado
cosmoscdcpipeline
.Na seção Atividades, expanda a opção Mover & transformar e selecione Fluxo de dados.
Dê um nome exclusivo à atividade de fluxo de dados. Neste exemplo, a atividade é denominada
cosmoscdcactivity
.Na guia Configurações, selecione o fluxo de dados nomeado que
cosmoscdc
você criou anteriormente neste guia. Em seguida, selecione um tamanho de computação com base no volume de dados e na latência necessária para sua carga de trabalho.Gorjeta
Para tamanhos de dados incrementais superiores a 100 GB, recomendamos o tamanho personalizado com uma contagem de núcleos de 32 (+16 núcleos de driver).
Selecione Adicionar gatilho. Agende esse pipeline para ser executado em uma cadência que faça sentido para sua carga de trabalho. Neste exemplo, o pipeline é configurado para ser executado a cada cinco minutos.
Nota
A janela mínima de recorrência para execuções de captura de dados de alteração é de um minuto.
Selecione Validar para garantir que não cometeu erros ou omissões. Em seguida, selecione Publicar para publicar o pipeline.
Observe os dados colocados no contêiner de Armazenamento de Blobs do Azure como uma saída do fluxo de dados usando a captura de dados de alteração do repositório analítico do Azure Cosmos DB.
Nota
O tempo inicial de inicialização do cluster pode levar até três minutos. Para evitar o tempo de inicialização do cluster nas execuções subsequentes de captura de dados de alteração, configure o valor Tempo de vida do cluster de fluxo de dados. Para obter mais informações sobre o tempo de execução de iteração e TTL, consulte Tempo de execução de integração no Azure Data Factory.
Trabalhos simultâneos
O tamanho do lote nas opções de origem, ou situações em que o coletor é lento para ingerir o fluxo de alterações, pode causar a execução de vários trabalhos ao mesmo tempo. Para evitar essa situação, defina a opção Simultaneidade como 1 nas configurações de Pipeline, para garantir que novas execuções não sejam acionadas até que a execução atual seja concluída.