Usar o feed de dados de alterações do Delta Lake no Azure Databricks

O feed de dados de alteração permite que o Azure Databricks acompanhe as alterações no nível de linha entre as versões de uma tabela Delta. Quando ele está habilitado em uma tabela Delta, o runtime registra eventos de alteração para todos os dados gravados na tabela. Isso inclui os dados de linha com metadados que indicam se a linha especificada foi inserida, excluída ou atualizada.

Importante

O feed de dados de alterações funciona em conjunto com o histórico de tabelas para fornecer informações de alteração. Como a clonagem de uma tabela Delta cria um histórico separado, o feed de dados de alterações em tabelas clonadas não corresponde ao da tabela original.

Processar incrementalmente os dados alterados

A Databricks recomenda a utilização do feed de dados alterados em combinação com o Streaming Estruturado para processar de forma incremental as alterações das tabelas Delta. Você deve usar o Streaming Estruturado para Azure Databricks para rastrear automaticamente as versões do feed de dados alterados da sua tabela.

Observação

Delta Live Tables fornece funcionalidade para fácil propagação de dados alterados e armazenamento de resultados como tabelas SCD (dimensão de alteração lenta) tipo 1 ou tipo 2. Consulte As APIs APPLY CHANGES: Simplifique a captura de dados de alteração com Delta Live Tables.

Para ler o feed de dados alterados de uma tabela, você deve habilitar o feed de dados alterados nessa tabela. Confira Habilitar o feed de dados de alterações.

Configure a opção readChangeFeed para true ao configurar um fluxo em uma tabela para ler o feed de dados alterados, conforme mostrado no exemplo de sintaxe a seguir:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Por padrão, o fluxo retorna o instantâneo mais recente da tabela quando o fluxo é iniciado pela primeira vez como um INSERT e as alterações futuras como dados alterados.

Os dados alterados são confirmados como parte da transação Delta Lake e ficam disponíveis ao mesmo tempo em que os novos dados são confirmados na tabela.

Opcionalmente, você pode especificar uma versão inicial. Veja Devo especificar uma versão inicial?.

O feed de dados alterados também oferece suporte à execução em lote, o que requer a especificação de uma versão inicial. Veja Ler alterações em consultas em lote.

Há suporte para opções como limites de taxa (maxFilesPerTrigger, maxBytesPerTrigger) e excludeRegex na leitura de dados de alterações.

A limitação de taxa pode ser atômica para versões diferentes da versão inicial do instantâneo. Ou seja, toda a versão de confirmação será limitada por taxa ou a confirmação inteira será retornada.

Devo especificar uma versão inicial?

Opcionalmente, você pode especificar uma versão inicial se quiser ignorar as alterações que ocorreram antes de uma versão específica. Você pode especificar uma versão usando um carimbo de data/hora ou o número de ID da versão registrado no log de transações Delta.

Observação

Uma versão inicial é necessária para leituras em lote, e muitos padrões de lote podem se beneficiar da configuração de uma versão final opcional.

Ao configurar cargas de trabalho de streaming estruturado envolvendo feed de dados alterados, é importante entender como a especificação de uma versão inicial afeta o processamento.

Muitas cargas de trabalho de streaming, especialmente novos pipelines de processamento de dados, beneficiam-se do comportamento padrão. Com o comportamento padrão, o primeiro lote é processado quando o fluxo registra pela primeira vez todos os registros existentes na tabela como INSERT operações no feed de dados alterados.

Se sua tabela de destino já contém todos os registros com alterações apropriadas até um determinado ponto, especifique uma versão inicial para evitar o processamento do estado da tabela de origem como eventos INSERT.

O exemplo a seguir de recuperação de sintaxe de uma falha de streaming em que o ponto de verificação foi corrompido. Nesse exemplo, assuma as seguintes condições:

  1. O feed de dados alterados foi habilitado na tabela de origem durante a criação da tabela.
  2. A tabela downstream de destino processou todas as alterações até a versão 75, inclusive.
  3. O histórico de versões da tabela de origem está disponível para versões 70 e superiores.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

Nesse exemplo, você também deve especificar um novo local de ponto de verificação.

Importante

Se você especificar uma versão inicial, o fluxo falhará ao iniciar a partir de um novo ponto de verificação se a versão inicial não estiver mais presente no histórico da tabela. Delta Lake limpa versões históricas automaticamente, o que significa que todas as versões iniciais especificadas serão eventualmente excluídas.

Veja Posso usar o feed de dados alterados para reproduzir todo o histórico de uma tabela?.

Ler alterações em consultas em lote

Você pode usar a sintaxe de consulta em lote para ler todas as alterações a partir de uma versão específica ou para ler as alterações dentro de um intervalo especificado de versões.

Especifique uma versão como um inteiro e um carimbo de data/hora como uma cadeia de caracteres no formato yyyy-MM-dd[ HH:mm:ss[.SSS]].

As versões inicial e final estão incluídas nas consultas. Para ler as alterações de uma versão inicial específica para a versão mais recente da tabela, especifique apenas a versão inicial.

Se você fornecer uma versão inferior ou um carimbo de data/hora anterior àquele que registrou os eventos de alteração, ou seja, quando o feed de dados de alterações foi habilitado, um erro será gerado indicando que o feed de dados de alterações não foi habilitado.

Os exemplos de sintaxe a seguir demonstram o uso de opções de versão inicial e final com leituras em lote:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Observação

Por padrão, se um usuário passar uma versão ou um carimbo de data/hora que exceda a última confirmação em uma tabela, o erro timestampGreaterThanLatestCommit será gerado. No Databricks Runtime 11.3 LTS e acima, o feed de dados de alterações poderá lidar com o caso de versão fora do intervalo se o usuário definir a configuração a seguir como true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Se você fornecer uma versão inicial maior que a última confirmação em uma tabela ou um carimbo de data/hora de início mais recente que a última confirmação em uma tabela, quando a configuração anterior estiver habilitada, um resultado de leitura vazio será retornado.

Se você fornecer uma versão final maior que a última confirmação em uma tabela ou um carimbo de data/hora de término mais recente que a última confirmação em uma tabela, quando a configuração anterior estiver habilitada no modo de leitura em lote, todas as alterações entre a versão inicial e a última confirmação serão retornadas.

Qual é o esquema para o feed de dados de alterações?

Quando você lê do feed de dados de alterações de uma tabela, o esquema para a versão mais recente da tabela é usado.

Observação

Há suporte total para a maioria das operações de alteração e evolução do esquema. Tabela com mapeamento de coluna habilitado não dá suporte a todos os casos de uso e demonstra um comportamento diferente. Confira Limitações do feed de dados de alterações para tabelas com mapeamento de coluna habilitado.

Além das colunas de dados do esquema da tabela Delta, o feed de dados de alterações contém colunas de metadados que identificam o tipo de evento de alteração:

Nome da coluna Type Valores
_change_type String insert, update_preimage , update_postimage, delete (1)
_commit_version long O log Delta ou a versão da tabela que contém a alteração.
_commit_timestamp Timestamp O carimbo de data/hora associado de quando a confirmação foi criada.

(1) preimage é o valor antes da atualização. postimage é o valor após a atualização.

Observação

Você não poderá habilitar o feed de dados de alterações em uma tabela se o esquema contiver colunas com os mesmos nomes que essas colunas adicionadas. Renomeie as colunas na tabela para resolver esse conflito antes de tentar habilitar o feed de dados de alterações.

Habilitar o feed de dados de alterações

Você só pode ler o feed de dados alterados para tabelas habilitadas. Habilite explicitamente a opção de feed de dados de alterações usando um dos seguintes métodos:

  • Nova tabela: defina a propriedade de tabela delta.enableChangeDataFeed = true no comando CREATE TABLE.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tabela existente: defina a propriedade de tabela delta.enableChangeDataFeed = true no comando ALTER TABLE.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Todas as novas tabelas:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Importante

Somente as alterações feitas após habilitar o feed de dados alterados serão registradas. As alterações anteriores em uma tabela não são capturadas.

Alterar o armazenamento de dados

Habilitar o feed de dados alterados causa um pequeno aumento nos custos de armazenamento de uma tabela. Os registros de dados alterados são gerados à medida que a consulta é executada e geralmente são muito menores que o tamanho total dos arquivos reescritos.

O Azure Databricks registra os dados de alterações para operações UPDATE, DELETE e MERGE na pasta _change_data no diretório da tabela. Algumas operações, como operações somente de inserção e exclusões de partição completa, não geram dados no diretório _change_data porque o Azure Databricks pode calcular com eficiência o feed de dados alterados diretamente do log de transações.

Todas as leituras em arquivos de dados na pasta _change_data devem passar por APIs Delta Lake compatíveis.

Os arquivos na pasta _change_data seguem a política de retenção da tabela. Os dados do feed de dados alterados são excluídos quando o comando VACUUM é executado.

Posso usar o feed de dados alterados para reproduzir todo o histórico de uma tabela?

O feed de dados alterados não se destina a servir como um registro permanente de todas as alterações em uma tabela. O feed de dados alterado registra apenas as alterações que ocorrem após está habilitado.

O feed de dados alterados e o Delta Lake permitem que você sempre reconstrua um instantâneo completo de uma tabela de origem, o que significa que você pode iniciar uma nova leitura de streaming em uma tabela com o feed de dados alterados habilitado e capturar a versão atual dessa tabela e todas as alterações que ocorrerem depois.

Você deve tratar os registros no feed de dados alterados como transitórios e acessíveis somente durante uma janela de retenção especificada. O log de transações Delta remove as versões da tabela e suas versões correspondentes do feed de dados alterados em intervalos regulares. Quando uma versão é removida do log de transações, você não pode mais ler o feed de dados alterados dessa versão.

Se o seu caso de uso exigir a manutenção de um histórico permanente de todas as alterações em uma tabela, você deverá usar a lógica incremental para gravar registros do feed de dados alterados em uma nova tabela. O exemplo de código a seguir demonstra o uso de trigger.AvailableNow, que aproveita o processamento incremental do streaming estruturado, mas processa os dados disponíveis como uma carga de trabalho em lote. Você pode agendar essa carga de trabalho de forma assíncrona com seus principais pipelines de processamento para criar um backup do feed de dados alterados para fins de auditoria ou capacidade de reprodução total.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Limitações do feed de dados de alterações para tabelas com mapeamento de coluna habilitado

Com o mapeamento de coluna habilitado em uma tabela Delta, você pode remover ou renomear colunas na tabela sem reescrever arquivos de dados para dados existentes. Com o mapeamento de coluna habilitado, o feed de dados de alterações tem limitações após a execução de alterações de esquema não aditivas, como renomear ou descartar uma coluna, alterar o tipo de dados ou alterações de nulidade.

Importante

  • Não é possível ler o feed de dados de alterações para uma transação ou intervalo no qual ocorre uma alteração de esquema não aditiva usando semântica de lote.
  • No Databricks Runtime 12.2 LTS e versões anteriores, tabelas com mapeamento de coluna habilitado que tiveram alterações de esquema não aditivas não dão suporte a leituras de streaming no feed de dados de alterações. Confira Streaming com mapeamento de coluna e alterações de esquema.
  • No Databricks Runtime 11.3 LTS e versões anteriores, você não pode ler o feed de dados de alterações para tabelas com mapeamento de coluna habilitado que tenham passado por renomeação ou remoção de colunas.

No Databricks Runtime 12.2 LTS e versões superiores, você pode executar leituras em lote no feed de dados de alterações para tabelas com mapeamento de coluna habilitado que tiveram alterações de esquema não aditivas. Em vez de usar o esquema da versão mais recente da tabela, as operações de leitura usam o esquema da versão final da tabela especificada na consulta. As consultas ainda falharão se o intervalo de versão especificado abranger uma alteração de esquema não aditiva.