Usare il feed di dati delle modifiche delta Lake in Azure Databricks

Il feed di dati delle modifiche consente ad Azure Databricks di tenere traccia delle modifiche a livello di riga tra le versioni di una tabella Delta. Se abilitata in una tabella Delta, il runtime registra gli eventi di modifica per tutti i dati scritti nella tabella. Sono inclusi i dati di riga insieme ai metadati che indicano se la riga specificata è stata inserita, eliminata o aggiornata.

Importante

Il feed di dati delle modifiche funziona in combinazione con la cronologia delle tabelle per fornire informazioni sulle modifiche. Poiché la clonazione di una tabella Delta crea una cronologia separata, il feed di dati delle modifiche nelle tabelle clonate non corrisponde a quello della tabella originale.

Elaborare in modo incrementale i dati delle modifiche

Databricks consiglia di usare il feed di dati delle modifiche in combinazione con Structured Streaming per elaborare in modo incrementale le modifiche dalle tabelle Delta. È necessario usare Structured Streaming per Azure Databricks per tenere traccia automatica delle versioni per il feed di dati delle modifiche della tabella.

Nota

Le tabelle live delta offrono funzionalità per facilitare la propagazione dei dati delle modifiche e l'archiviazione dei risultati come dimensione a modifica lenta (dimensione a modifica lenta) di tipo 1 o di tipo 2 tabelle. Vedere LE API APPLY CHANGES: Semplificare Change Data Capture con le tabelle Live Delta.

Per leggere il feed di dati delle modifiche da una tabella, è necessario abilitare il feed di dati delle modifiche in tale tabella. Vedere Abilitare il feed di dati delle modifiche.

Impostare l'opzione readChangeFeed su true quando si configura un flusso su una tabella per leggere il feed di dati delle modifiche, come illustrato nell'esempio di sintassi seguente:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Per impostazione predefinita, il flusso restituisce lo snapshot più recente della tabella quando il flusso viene avviato per la prima volta come INSERT modifiche future come dati delle modifiche.

Il commit dei dati delle modifiche viene eseguito come parte della transazione Delta Lake e diventa disponibile contemporaneamente ai nuovi commit dei dati nella tabella.

Facoltativamente, è possibile specificare una versione iniziale. Vedere È consigliabile specificare una versione iniziale?.

Il feed di dati delle modifiche supporta anche l'esecuzione batch, che richiede la specifica di una versione iniziale. Vedere Leggere le modifiche nelle query batch.

Opzioni come limiti di frequenza (maxFilesPerTrigger, maxBytesPerTrigger) e excludeRegex sono supportate anche durante la lettura dei dati delle modifiche.

La limitazione della frequenza può essere atomica per le versioni diverse dalla versione dello snapshot iniziale. Ovvero, l'intera versione del commit sarà limitata o verrà restituito l'intero commit.

È necessario specificare una versione iniziale?

Facoltativamente, è possibile specificare una versione iniziale se si desidera ignorare le modifiche apportate prima di una determinata versione. È possibile specificare una versione usando un timestamp o il numero di ID versione registrato nel log delle transazioni Delta.

Nota

È necessaria una versione iniziale per le letture batch e molti modelli batch possono trarre vantaggio dall'impostazione di una versione finale facoltativa.

Quando si configurano carichi di lavoro structured streaming che coinvolgono feed di dati delle modifiche, è importante comprendere come specificare una versione iniziale influisce sull'elaborazione.

Molti carichi di lavoro di streaming, in particolare le nuove pipeline di elaborazione dati, traggono vantaggio dal comportamento predefinito. Con il comportamento predefinito, il primo batch viene elaborato quando il flusso registra tutti i record esistenti nella tabella come INSERT operazioni nel feed di dati delle modifiche.

Se la tabella di destinazione contiene già tutti i record con modifiche appropriate fino a un determinato punto, specificare una versione iniziale per evitare di elaborare lo stato della tabella di origine come INSERT eventi.

La sintassi di esempio seguente viene ripristinata da un errore di streaming in cui il checkpoint è danneggiato. In questo esempio si presuppongono le condizioni seguenti:

  1. Il feed di dati delle modifiche è stato abilitato nella tabella di origine al momento della creazione della tabella.
  2. La tabella downstream di destinazione ha elaborato tutte le modifiche fino alla versione 75 e incluse.
  3. La cronologia delle versioni per la tabella di origine è disponibile per le versioni 70 e successive.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

In questo esempio è necessario specificare anche una nuova posizione del checkpoint.

Importante

Se si specifica una versione iniziale, il flusso non viene avviato da un nuovo checkpoint se la versione iniziale non è più presente nella cronologia delle tabelle. Delta Lake pulisce automaticamente le versioni cronologiche, ovvero tutte le versioni iniziali specificate vengono eliminate.

Vedere È possibile usare il feed di dati delle modifiche per riprodurre l'intera cronologia di una tabella?

Leggere le modifiche nelle query batch

È possibile usare la sintassi delle query batch per leggere tutte le modifiche a partire da una determinata versione o per leggere le modifiche all'interno di un intervallo specificato di versioni.

Specificare una versione come numero intero e un timestamp come stringa nel formato yyyy-MM-dd[ HH:mm:ss[.SSS]].

Le versioni iniziali e finali sono incluse nelle query. Per leggere le modifiche da una determinata versione iniziale alla versione più recente della tabella, specificare solo la versione iniziale.

Se si specifica una versione precedente o un timestamp precedente a uno che ha registrato eventi di modifica, ovvero quando il feed di dati delle modifiche è stato abilitato, viene generato un errore che indica che il feed di dati delle modifiche non è stato abilitato.

Gli esempi di sintassi seguenti illustrano l'uso delle opzioni della versione iniziale e finale con letture batch:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Nota

Per impostazione predefinita, se un utente passa una versione o un timestamp che supera l'ultimo commit in una tabella, viene generato l'errore timestampGreaterThanLatestCommit . In Databricks Runtime 11.3 LTS e versioni successive, il feed di dati delle modifiche può gestire il caso di versione fuori intervallo se l'utente imposta la configurazione seguente su true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Se si specifica una versione iniziale maggiore dell'ultimo commit in una tabella o un timestamp di inizio più recente dell'ultimo commit in una tabella, quando la configurazione precedente è abilitata, viene restituito un risultato di lettura vuoto.

Se si specifica una versione finale maggiore dell'ultimo commit in una tabella o un timestamp di fine più recente dell'ultimo commit in una tabella, quando la configurazione precedente è abilitata in modalità di lettura batch, tutte le modifiche tra la versione iniziale e l'ultimo commit vengono restituite.

Qual è lo schema per il feed di dati delle modifiche?

Quando si legge dal feed di dati delle modifiche per una tabella, viene usato lo schema per la versione più recente della tabella.

Nota

La maggior parte delle operazioni di modifica ed evoluzione dello schema è completamente supportata. La tabella con mapping di colonne abilitata non supporta tutti i casi d'uso e dimostra un comportamento diverso. Vedere Modificare le limitazioni del feed di dati per le tabelle con mapping delle colonne abilitate.

Oltre alle colonne di dati dello schema della tabella Delta, il feed di dati delle modifiche contiene colonne di metadati che identificano il tipo di evento di modifica:

Nome colonna Type Valori
_change_type String insert, update_preimage , update_postimage, delete (1)
_commit_version Lungo Versione del log o della tabella Delta contenente la modifica.
_commit_timestamp Timestamp: Timestamp associato al momento della creazione del commit.

(1) preimage è il valore prima dell'aggiornamento, postimage è il valore dopo l'aggiornamento.

Nota

Non è possibile abilitare il feed di dati delle modifiche in una tabella se lo schema contiene colonne con gli stessi nomi di queste colonne aggiunte. Rinominare le colonne nella tabella per risolvere il conflitto prima di tentare di abilitare il feed di dati delle modifiche.

Abilitare il feed di dati delle modifiche

È possibile leggere solo il feed di dati delle modifiche per le tabelle abilitate. È necessario abilitare in modo esplicito l'opzione feed di dati delle modifiche usando uno dei metodi seguenti:

  • Nuova tabella: impostare la proprietà delta.enableChangeDataFeed = true table nel CREATE TABLE comando .

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tabella esistente: impostare la proprietà delta.enableChangeDataFeed = true table nel ALTER TABLE comando .

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tutte le nuove tabelle:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Importante

Vengono registrate solo le modifiche apportate dopo aver abilitato il feed di dati delle modifiche. Le modifiche passate a una tabella non vengono acquisite.

Modificare l'archiviazione dei dati

L'abilitazione del feed di dati delle modifiche comporta un piccolo aumento dei costi di archiviazione per una tabella. I record di dati delle modifiche vengono generati durante l'esecuzione della query e in genere sono molto più piccoli delle dimensioni totali dei file riscritti.

Azure Databricks registra i dati delle modifiche per UPDATEle operazioni , DELETEe MERGE nella _change_data cartella nella directory della tabella. Alcune operazioni, ad esempio operazioni di sola inserimento ed eliminazioni di partizioni complete, non generano dati nella _change_data directory perché Azure Databricks può calcolare in modo efficiente il feed di dati delle modifiche direttamente dal log delle transazioni.

Tutte le letture sui file di dati nella _change_data cartella devono passare attraverso le API Delta Lake supportate.

I file nella _change_data cartella seguono i criteri di conservazione della tabella. I dati del feed di dati delle modifiche vengono eliminati quando viene eseguito il VACUUM comando.

È possibile usare il feed di dati delle modifiche per riprodurre l'intera cronologia di una tabella?

Il feed di dati delle modifiche non è destinato a fungere da record permanente di tutte le modifiche apportate a una tabella. Il feed di dati di modifica registra solo le modifiche apportate dopo l'abilitazione.

Il feed di dati delle modifiche e Delta Lake consentono di ricostruire sempre uno snapshot completo di una tabella di origine, ovvero è possibile avviare un nuovo flusso letto in una tabella con feed di dati delle modifiche abilitato e acquisire la versione corrente di tale tabella e tutte le modifiche apportate dopo.

È necessario considerare i record nel feed di dati delle modifiche come temporanei e accessibili solo per una finestra di conservazione specificata. Il log delle transazioni Delta rimuove le versioni delle tabelle e le corrispondenti versioni del feed di dati delle modifiche a intervalli regolari. Quando una versione viene rimossa dal log delle transazioni, non è più possibile leggere il feed di dati delle modifiche per tale versione.

Se il caso d'uso richiede la gestione di una cronologia permanente di tutte le modifiche apportate a una tabella, è consigliabile usare la logica incrementale per scrivere record dal feed di dati delle modifiche a una nuova tabella. L'esempio di codice seguente illustra l'uso trigger.AvailableNowdi , che sfrutta l'elaborazione incrementale di Structured Streaming, ma elabora i dati disponibili come carico di lavoro batch. È possibile pianificare questo carico di lavoro in modo asincrono con le pipeline di elaborazione principali per creare un backup del feed di dati delle modifiche per scopi di controllo o riproduzione completa.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Modificare le limitazioni del feed di dati per le tabelle con mapping di colonne abilitato

Con il mapping delle colonne abilitato in una tabella Delta, è possibile eliminare o rinominare colonne nella tabella senza riscrivere i file di dati per i dati esistenti. Con il mapping delle colonne abilitato, il feed di dati delle modifiche presenta limitazioni dopo l'esecuzione di modifiche dello schema non additive, ad esempio la ridenominazione o l'eliminazione di una colonna, la modifica del tipo di dati o le modifiche di valori Null.

Importante

  • Non è possibile leggere il feed di dati delle modifiche per una transazione o un intervallo in cui si verifica una modifica dello schema non additivi usando la semantica batch.
  • In Databricks Runtime 12.2 LTS e versioni successive le tabelle con mapping di colonna abilitato che hanno subito modifiche dello schema non additive non supportano le letture di streaming nel feed di dati delle modifiche. Vedere Streaming con mapping di colonne e modifiche dello schema.
  • In Databricks Runtime 11.3 LTS e versioni successive non è possibile leggere il feed di dati delle modifiche per le tabelle con mapping di colonne abilitato che hanno riscontrato la ridenominazione o l'eliminazione delle colonne.

In Databricks Runtime 12.2 LTS e versioni successive è possibile eseguire letture batch nel feed di dati delle modifiche per le tabelle con mapping di colonne abilitato che hanno subito modifiche dello schema non additive. Anziché usare lo schema della versione più recente della tabella, le operazioni di lettura usano lo schema della versione finale della tabella specificata nella query. Le query continuano a non riuscire se l'intervallo di versioni specificato si estende su una modifica dello schema non additivi.