As APIs APPLY CHANGES: Simplifique a captura de dados de alteração com Delta Live Tables

O Delta Live Tables simplifica a captura de dados de alteração (CDC) com as APIs APPLY CHANGES e APPLY CHANGES FROM SNAPSHOT. A interface que você usa depende da fonte de dados de alteração:

  • Use APPLY CHANGES para processar alterações de um feed de dados de alterações (CDF).
  • Use APPLY CHANGES FROM SNAPSHOT (Visualização Pública) para processar alterações em instantâneos de banco de dados.

Anteriormente, a instrução MERGE INTO era comumente usada para processar registros CDC no Azure Databricks. No entanto, MERGE INTO pode produzir resultados incorretos devido a registros fora de sequência ou requer lógica complexa para reordenar os registros.

A API APPLY CHANGES é compatível com as interfaces SQL e Python do Delta Live Tables. A API APPLY CHANGES FROM SNAPSHOT é compatível com a interface Python do Delta Live Tables.

Tanto o APPLY CHANGES quanto o APPLY CHANGES FROM SNAPSHOT dão suporte à atualização de tabelas usando SCD tipo 1 e tipo 2:

  • Use o SCD tipo 1 para atualizar registros diretamente. O histórico não é retido para registros atualizados.
  • Use o SCD tipo 2 para reter um histórico de registros, seja em todas as atualizações ou nas atualizações de um conjunto específico de colunas.

Para sintaxe e outras referências, consulte:

Observação

Este artigo descreve como atualizar tabelas em seu pipeline do Delta Live Tables com base em alterações nos dados de origem. Para saber como registrar e consultar informações de alteração em nível de linha em tabelas Delta, confira Usar o feed de dados de alterações do Delta Lake no Azure Databricks.

Requisitos

Para usar as APIs CDC, seu pipeline deve ser configurado para usar pipelines DLT sem servidor ou o Delta Live Tables Pro ou Advanced edições.

Como o CDC é implementado com a API APPLY CHANGES?

Ao lidar automaticamente com registros fora de sequência, a API APPLY CHANGES nas Tabelas Dinâmicas Delta garante o processamento correto dos registros CDC e remove a necessidade de desenvolver uma lógica complexa para lidar com registros fora de sequência. Você deve especificar uma coluna nos dados de origem na qual sequenciar os registros, que o Delta Live Tables interpreta como uma representação monotonicamente crescente da ordem adequada dos dados de origem. O Delta Live Tables manipula automaticamente os dados que chegam fora de ordem. Para alterações de SCD tipo 2, o Delta Live Tables propaga os valores de sequenciamento apropriados para as colunas __START_AT e __END_AT da tabela de destino. Deve haver uma atualização distinta por chave em cada valor de sequenciamento, e não há suporte para valores de sequenciamento NULL.

Para executar o processamento de CDC com APPLY CHANGES, primeiro crie uma tabela de streaming e, em seguida, use a instrução APPLY CHANGES INTO em SQL ou a função apply_changes() em Python para especificar a origem, as chaves e o sequenciamento do feed de alterações. Para criar a tabela de streaming de destino, use a instrução CREATE OR REFRESH STREAMING TABLE no SQL ou a função create_streaming_table() no Python. Consulte os exemplos de processamento SCD tipo 1 e tipo 2.

Para obter detalhes de sintaxe, consulte Delta Live Tables referência SQL ou referência Python.

Como o CDC é implementado com a API APPLY CHANGES FROM SNAPSHOT?

Importante

A API APPLY CHANGES FROM SNAPSHOT está em Visualização Pública.

APPLY CHANGES FROM SNAPSHOT é uma API declarativa que determina com eficiência as alterações nos dados de origem comparando uma série de instantâneos em ordem e, em seguida, executa o processamento necessário para o processamento CDC dos registros nos instantâneos. APPLY CHANGES FROM SNAPSHOT é suportado apenas pela interface Python do Delta Live Tables.

APPLY CHANGES FROM SNAPSHOT suporta a ingestão de snapshots de vários tipos de origem:

  • Use a ingestão periódica de snapshots para ingerir snapshots de uma tabela ou exibição existente. APPLY CHANGES FROM SNAPSHOT tem uma interface simples e simplificada para dar suporte à ingestão periódica de instantâneos de um objeto de banco de dados existente. Um novo instantâneo é ingerido com cada atualização de pipeline e o tempo de ingestão é usado como a versão do instantâneo. Quando um pipeline é executado no modo contínuo, vários instantâneos são ingeridos com cada atualização de pipeline em um período determinado pela configuração intervalo de gatilho para o fluxo que contém o processamento APPLY CHANGES FROM SNAPSHOT.
  • Use a ingestão de snapshots históricos para processar arquivos que contêm snapshots de banco de dados, como snapshots gerados a partir de um banco de dados Oracle ou MySQL ou de um data warehouse.

Para executar o processamento CDC de qualquer tipo de origem com APPLY CHANGES FROM SNAPSHOT, primeiro crie uma tabela de streaming e, em seguida, use a função apply_changes_from_snapshot() em Python para especificar o instantâneo, as chaves e outros argumentos necessários para implementar o processamento. Consulte os exemplos de ingestão periódica de snapshots e ingestão de snapshots históricos.

Os snapshots passados para a API devem estar em ordem crescente por versão. Se o Delta Live Tables detectar um instantâneo fora de ordem, um erro será gerado.

Para obter detalhes de sintaxe, consulte Delta Live Tables referência do Python.

Limitações

A coluna usada para sequenciamento deve ser um tipo de dados classificável.

Exemplo: processamento SCD tipo 1 e SCD tipo 2 com dados de origem CDF

As seções a seguir fornecem exemplos de consultas SCD do Delta Live Tables tipo 1 e tipo 2 que atualizam tabelas de destino com base em eventos de origem de um feed de dados de alteração que:

  1. Cria novos registros de usuário.
  2. Exclui um registro de usuário.
  3. Atualiza os registros do usuário. No exemplo do SCD tipo 1, as últimas operações UPDATE chegam tarde e são removidas da tabela de destino, demonstrando a manipulação de eventos fora de ordem.

Os exemplos a seguir assumem familiaridade com a configuração e atualização de pipelines do Delta Live Tables. Consulte Tutorial: como executar seu primeiro pipeline do Delta Live Tables.

Para executar esses exemplos, você deve começar criando um conjunto de dados de exemplo. Consulte Gerar dados de teste.

Veja a seguir os registros de entrada para esses exemplos:

userId name city operação sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 nulo null Delete (excluir) 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Se você descompactar a linha final nos dados de exemplo, ele inserirá o seguinte registro que especifica onde os registros devem ser truncados:

userId name city operação sequenceNum
null nulo nulo TRUNCATE 3

Observação

Todos os exemplos a seguir incluem opções para especificar operações DELETE e TRUNCATE, mas cada uma é opcional.

Processar atualizações do SCD tipo 1

O exemplo a seguir demonstra o processamento de atualizações do SCD tipo 1:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Depois de executar o exemplo do SCD tipo 1, a tabela de destino contém os seguintes registros:

userId name city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancun

Depois de executar o exemplo SCD tipo 1 com o registro adicional TRUNCATE, os registros 124 e 126 são truncados devido à operação TRUNCATE em sequenceNum=3, e a tabela de destino contém o seguinte registro:

userId name city
125 Mercedes Guadalajara

Processar atualizações do SCD tipo 2

O exemplo a seguir demonstra o processamento de atualizações do SCD tipo 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Depois de executar o exemplo do SCD tipo 2, a tabela de destino contém os seguintes registros:

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 nulo
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 nulo
126 Lily Cancun 2 nulo

Uma consulta SCD tipo 2 também pode especificar um subconjunto de colunas de saída a serem rastreadas para o histórico na tabela de destino. As alterações em outras colunas são atualizadas em vez de gerar novos registros de histórico. O exemplo a seguir demonstra a exclusão da coluna city do acompanhamento:

O exemplo a seguir demonstra o uso do histórico de faixas com o SCD tipo 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Depois de executar este exemplo sem o registro adicional TRUNCATE, a tabela de destino contém os seguintes registros:

userId name city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 nulo
125 Mercedes Guadalajara 2 nulo
126 Lily Cancun 2 nulo

Gerar dados de teste

O código a seguir é fornecido para gerar um conjunto de dados de exemplo para uso nas consultas de exemplo presentes neste tutorial. Supondo que você tenha as credenciais adequadas para criar um novo esquema e criar uma nova tabela, você pode executar essas instruções com um notebook ou o Databricks SQL. O código a seguir não pretende ser executado como parte de um pipeline do Delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Exemplo: processamento periódico de snapshots

O exemplo a seguir demonstra o processamento SCD tipo 2 que ingere instantâneos de uma tabela armazenada em mycatalog.myschema.mytable. Os resultados do processamento são gravados em uma tabela chamada target.

mycatalog.myschema.mytable registros no carimbo de data/hora 2024-01-01 00:00:00

Chave Valor
1 a1
2 a2

mycatalog.myschema.mytable registros no carimbo de data/hora 2024-01-01 12:00:00

Chave Valor
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Depois de processar os instantâneos, a tabela de destino contém os seguintes registros:

Chave Valor __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 nulo
3 a3 2024-01-01 12:00:00 nulo

Exemplo: Processamento de instantâneo histórico

O exemplo a seguir demonstra o processamento do SCD tipo 2 que atualiza uma tabela de destino com base em eventos de origem de dois snapshots armazenados em um sistema de armazenamento em nuvem:

Instantâneo em timestamp, armazenado em /<PATH>/filename1.csv

Chave TrackingColumn NonTrackingColumn
1 a1 b1
2 a2 b2
4 a4 b4

Instantâneo em timestamp + 5, armazenado em /<PATH>/filename2.csv

Chave TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

O exemplo de código a seguir demonstra o processamento de atualizações do SCD tipo 2 com esses instantâneos:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Depois de processar os instantâneos, a tabela de destino contém os seguintes registros:

Chave TrackingColumn NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 nulo
3 a3 b3 2 nulo
4 a4 b4_new 1 nulo

Adicionar, alterar ou excluir dados em uma tabela de streaming de destino

Se o pipeline publicar tabelas no Catálogo do Unity, você poderá usar instruções DML linguagem de manipulação de dados, incluindo instruções de inserção, atualização, exclusão e mesclagem, para modificar as tabelas de streaming de destino criadas por instruções APPLY CHANGES INTO.

Observação

  • Não há suporte para instruções DML que modificam o esquema de tabela de uma tabela de streaming. Verifique se as instruções DML não tentam desenvolver o esquema da tabela.
  • Instruções DML que atualizam uma tabela de streaming só podem ser executadas em um cluster compartilhado do Catálogo do Unity ou em um SQL warehouse usando o Databricks Runtime 13.3 LTS e versões superiores.
  • Como o streaming exige fontes de dados somente acréscimo, se o processamento exigir streaming de uma tabela de streaming de origem com alterações (por exemplo, por instruções DML), defina o sinalizador skipChangeCommits ao ler a tabela de streaming de origem. Quando skipChangeCommits é definido, as transações que excluem ou modificam registros na tabela de origem são ignoradas. Se o processamento não exigir uma tabela de streaming, você poderá usar uma exibição materializada (que não tem a restrição somente acréscimo) como a tabela de destino.

Como o Delta Live Tables usa uma coluna SEQUENCE BY especificada e propaga valores de sequenciamento apropriados para as colunas __START_AT e __END_AT da tabela de destino (para SCD tipo 2), você deve garantir que as instruções DML usem valores válidos para essas colunas para manter a ordenação adequada de registros. Consulte Como o CDC é implementado com a API APPLY CHANGES? .

Para obter mais informações sobre como usar instruções DML com tabelas de streaming, consulte Adicionar, alterar ou excluir dados em uma tabela de streaming.

O exemplo a seguir insere um registro ativo com uma sequência inicial de 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Ler um feed de dados de alteração de uma tabela de destino APPLY CHANGES

No Databricks Runtime 15.2 e superior, você pode ler um feed de dados de alteração de uma tabela de streaming que é o destino de consultas APPLY CHANGES ou APPLY CHANGES FROM SNAPSHOT da mesma forma que lê um feed de dados de alteração de outras tabelas do Delta. Os seguintes itens são necessários para ler o feed de dados de alteração de uma tabela de streaming de destino:

  • A tabela de streaming de destino deve ser publicada no Catálogo do Unity. Consulte Usar o Catálogo do Unity com seus pipelines das Tabelas Dinâmicas do Delta.
  • Para ler o feed de dados de alteração da tabela de streaming de destino, você deve usar o Databricks Runtime 15.2 ou superior. Para ler o feed de dados de alteração em outro pipeline do Delta Live Tables, o pipeline deve ser configurado para usar o Databricks Runtime 15.2 ou superior.

Você lê o feed de dados de alteração de uma tabela de streaming de destino criada em um pipeline do Delta Live Tables da mesma forma que lê um feed de dados de alteração de outras tabelas do Delta. Para saber mais sobre como usar a funcionalidade de feed de dados de alteração do Delta, incluindo exemplos em Python e SQL, confira Usar o feed de dados de alterações do Delta Lake no Azure Databricks.

Observação

O registro do feed de dados de alteração inclui metadados que identificam o tipo de evento de alteração. Quando um registro é atualizado em uma tabela, os metadados dos registros de alteração associados normalmente incluem valores _change_type definidos como eventos update_preimage e update_postimage.

No entanto, os valores _change_type serão diferentes se forem feitas atualizações na tabela de streaming de destino que incluam a alteração de valores da chave primária. Quando as alterações incluem atualizações de chaves primárias, os campos de metadados _change_type são definidos como eventos insert e delete. As alterações nas chaves primárias podem ocorrer quando atualizações manuais são feitas em um dos campos-chave com uma instrução UPDATE ou MERGE ou, em tabelas SCD do tipo 2, quando o campo __start_at é alterado para refletir um valor de sequência inicial anterior.

A consulta APPLY CHANGES determina os valores de chave primária, que diferem para o processamento de SCD do tipo 1 e SCD do tipo 2:

  • No processamento de SCD do tipo 1 e na interface Python do Delta Live Tables, a chave primária é o valor do parâmetro keys na função apply_changes(). Na interface SQL do Delta Live Tables, a chave primária são as colunas definidas pela cláusula KEYS na instrução APPLY CHANGES INTO.
  • No SCD do tipo 2, a chave primária é o parâmetro keys ou a cláusula KEYS mais o valor retornado da operação coalesce(__START_AT, __END_AT), onde __START_AT e __END_AT são as colunas correspondentes da tabela de streaming de destino.

Obter dados sobre registros processados por uma consulta CDC do Delta Live Tables

Observação

As métricas a seguir são capturadas apenas por consultas APPLY CHANGES e não por consultas APPLY CHANGES FROM SNAPSHOT.

As seguintes métricas são capturadas por consultas APPLY CHANGES:

  • num_upserted_rows: o número de linhas de saída para as quais foi executado upsert no conjunto de dados durante uma atualização.
  • num_deleted_rows: o número de linhas de saída existentes excluídas do conjunto de dados durante uma atualização.

A métrica num_output_rows, a saída de fluxos que não são CDA, não é capturada para consultas apply changes.

Quais objetos de dados são usados para processamento CDFC de Delta Live Tables?

Nota: As estruturas de dados a seguir se aplicam somente ao processamento de APPLY CHANGES, não APPLY CHANGES FROM SNAPSHOT processamento.

Quando você declara a tabela de destino no metastore do Hive, duas estruturas de dados são criadas:

  • Uma exibição usando o nome atribuído à tabela de destino.
  • Uma tabela de backup interna usada por Delta Live Tables para gerenciar o processamento CDC. Esta tabela é nomeada anexando __apply_changes_storage_ ao nome da tabela de destino.

Por exemplo, se você declarar uma tabela de destino chamada dlt_cdc_target, verá uma exibição chamada dlt_cdc_target e uma tabela chamada __apply_changes_storage_dlt_cdc_target no metastore. A criação de uma exibição permite que o Delta Live Tables filtre as informações extras (por exemplo, marcas de exclusão e versões) necessárias para lidar com dados fora de ordem. Para exibir os dados processados, consulte a exibição de destino. Como o esquema da tabela __apply_changes_storage_ pode ser alterado para dar suporte a recursos ou aprimoramentos futuros, você não deve consultar a tabela para uso em produção. Se você adicionar dados manualmente à tabela, os registros deverão vir antes de outras alterações porque as colunas de versão estão ausentes.

Se um pipeline publicar no Catálogo do Unity, as tabelas de backup internas ficarão inacessíveis aos usuários.