Använda Delta Lake-ändringsdataflöde i Azure Databricks

Med ändringsdataflöde kan Azure Databricks spåra ändringar på radnivå mellan versioner av en Delta-tabell. När den är aktiverad i en Delta-tabell registrerar körningen ändringshändelser för alla data som skrivits till tabellen. Detta inkluderar raddata tillsammans med metadata som anger om den angivna raden infogades, togs bort eller uppdaterades.

Viktigt!

Ändringsdataflödet fungerar tillsammans med tabellhistoriken för att tillhandahålla ändringsinformation. Eftersom kloning av en Delta-tabell skapar en separat historik matchar inte ändringsdataflödet för klonade tabeller den ursprungliga tabellens.

Bearbeta ändringsdata stegvis

Databricks rekommenderar att du använder ändringsdataflöde i kombination med Strukturerad direktuppspelning för att stegvis bearbeta ändringar från Delta-tabeller. Du måste använda Structured Streaming för Azure Databricks för att automatiskt spåra versioner för tabellens ändringsdataflöde.

Kommentar

Delta Live Tables ger funktioner för enkel spridning av ändringsdata och lagring av resultat som SCD-tabeller (långsamt föränderliga dimensioner) typ 1 eller typ 2. Se API:er för TILLÄMPA ÄNDRINGAR: Förenkla insamling av ändringsdata med Delta Live Tables.

Om du vill läsa ändringsdataflödet från en tabell måste du aktivera ändringsdataflöde i tabellen. Se Aktivera ändringsdataflöde.

Ange alternativet readChangeFeed till true när du konfigurerar en dataström mot en tabell för att läsa ändringsdataflödet, som du ser i följande syntaxexempel:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Som standard returnerar strömmen den senaste ögonblicksbilden av tabellen när strömmen först startar som en INSERT och framtida ändringar som ändringsdata.

Ändra dataincheckningar som en del av Delta Lake-transaktionen och blir tillgängliga samtidigt som de nya data checkas in i tabellen.

Du kan också ange en startversion. Se Ska jag ange en startversion?.

Ändringsdataflöde stöder även batchkörning, vilket kräver att du anger en startversion. Se Läsa ändringar i batchfrågor.

Alternativ som hastighetsbegränsningar (maxFilesPerTrigger, maxBytesPerTrigger) och excludeRegex stöds även när du läser ändringsdata.

Hastighetsbegränsning kan vara atomiska för andra versioner än den första ögonblicksbildversionen. Det innebär att hela incheckningsversionen kommer att vara frekvensbegränsad eller så returneras hela incheckningen.

Ska jag ange en startversion?

Du kan också ange en startversion om du vill ignorera ändringar som inträffat före en viss version. Du kan ange en version med hjälp av en tidsstämpel eller versions-ID-numret som registrerats i Delta-transaktionsloggen.

Kommentar

En startversion krävs för batchläsningar, och många batchmönster kan ha nytta av att ange en valfri slutversion.

När du konfigurerar arbetsbelastningar för strukturerad direktuppspelning med ändringsdataflöde är det viktigt att förstå hur det påverkar bearbetningen att ange en startversion.

Många strömningsarbetsbelastningar, särskilt nya databehandlingspipelines, drar nytta av standardbeteendet. Med standardbeteendet bearbetas den första batchen när strömmen först registrerar alla befintliga poster i tabellen som INSERT åtgärder i ändringsdataflödet.

Om måltabellen redan innehåller alla poster med lämpliga ändringar upp till en viss punkt anger du en startversion för att undvika att bearbeta källtabelltillståndet som INSERT händelser.

Följande exempelsyntax återställs från ett strömningsfel där kontrollpunkten var skadad. I det här exemplet förutsätter du följande villkor:

  1. Ändringsdataflöde aktiverades i källtabellen när tabellen skapades.
  2. Den underordnade måltabellen har bearbetat alla ändringar fram till och med version 75.
  3. Versionshistorik för källtabellen är tillgänglig för version 70 och senare.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

I det här exemplet måste du också ange en ny kontrollpunktsplats.

Viktigt!

Om du anger en startversion kan strömmen inte starta från en ny kontrollpunkt om startversionen inte längre finns i tabellhistoriken. Delta Lake rensar upp historiska versioner automatiskt, vilket innebär att alla angivna startversioner så småningom tas bort.

Se Kan jag använda ändringsdataflöde för att spela upp hela historiken för en tabell?.

Läsa ändringar i batchfrågor

Du kan använda batchfrågesyntax för att läsa alla ändringar från en viss version eller för att läsa ändringar inom ett angivet versionsintervall.

Du anger en version som ett heltal och en tidsstämplar som en sträng i formatet yyyy-MM-dd[ HH:mm:ss[.SSS]].

Start- och slutversionerna är inkluderande i frågorna. Om du vill läsa ändringarna från en viss startversion till den senaste versionen av tabellen anger du endast startversionen.

Om du anger en lägre version eller en tidsstämpel som är äldre än en som har registrerat ändringshändelser, dvs. när ändringsdataflödet aktiverades, utlöses ett fel som anger att ändringsdataflödet inte har aktiverats.

Följande syntaxexempel visar hur du använder alternativ för att starta och avsluta version med batchläsningar:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Kommentar

Om en användare som standard skickar in en version eller tidsstämpel som överskrider den senaste incheckningen i en tabell utlöses felet timestampGreaterThanLatestCommit . I Databricks Runtime 11.3 LTS och senare kan ändringsdataflödet hantera versionsfallet för out-of-range om användaren anger följande konfiguration till true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Om du anger en startversion som är större än den senaste incheckningen i en tabell eller en starttidsstämpel som är nyare än den senaste incheckningen i en tabell returneras ett tomt läsresultat när föregående konfiguration är aktiverad.

Om du anger en slutversion som är större än den senaste incheckningen i en tabell eller en sluttidsstämpel som är nyare än den senaste incheckningen i en tabell, returneras alla ändringar mellan startversionen och den senaste incheckningen när den föregående konfigurationen är aktiverad i batchläsningsläge.

Vad är schemat för ändringsdataflödet?

När du läser från ändringsdataflödet för en tabell används schemat för den senaste tabellversionen.

Kommentar

De flesta schemaändringar och utvecklingsåtgärder stöds fullt ut. Tabell med kolumnmappning aktiverat stöder inte alla användningsfall och visar olika beteende. Se Ändra dataflödesbegränsningar för tabeller med kolumnmappning aktiverat.

Förutom datakolumnerna från schemat i Delta-tabellen innehåller ändringsdataflödet metadatakolumner som identifierar typen av ändringshändelse:

Kolumnnamn Typ Värden
_change_type String insert, update_preimage , update_postimage, delete (1)
_commit_version Long Deltaloggen eller tabellversionen som innehåller ändringen.
_commit_timestamp Tidsstämpel Tidsstämpeln som var associerad när incheckningen skapades.

(1) preimage är värdet före uppdateringen, postimage är värdet efter uppdateringen.

Kommentar

Du kan inte aktivera ändringsdataflöde i en tabell om schemat innehåller kolumner med samma namn som de tillagda kolumnerna. Byt namn på kolumner i tabellen för att lösa den här konflikten innan du försöker aktivera ändringsdataflöde.

Aktivera ändringsdataflöde

Du kan bara läsa ändringsdataflödet för aktiverade tabeller. Du måste uttryckligen aktivera alternativet ändra dataflöde med någon av följande metoder:

  • Ny tabell: Ange tabellegenskapen CREATE TABLE delta.enableChangeDataFeed = true i kommandot .

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Befintlig tabell: Ange tabellegenskapen ALTER TABLE delta.enableChangeDataFeed = true i kommandot .

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Alla nya tabeller:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Viktigt!

Endast ändringar som görs när du har aktiverat ändringsdataflödet registreras. Tidigare ändringar i en tabell registreras inte.

Ändra datalagring

Om du aktiverar ändringsdataflöde medför det en liten ökning av lagringskostnaderna för en tabell. Ändringsdataposter genereras när frågan körs och är vanligtvis mycket mindre än den totala storleken på omskrivna filer.

Azure Databricks registrerar ändringsdata för UPDATE, DELETEoch MERGE åtgärder i _change_data mappen under tabellkatalogen. Vissa åtgärder, till exempel åtgärder med endast infogning och borttagningar av fullständig partition, genererar inte data i _change_data katalogen eftersom Azure Databricks effektivt kan beräkna ändringsdataflödet direkt från transaktionsloggen.

Alla läsningar mot datafiler i _change_data mappen bör gå igenom Delta Lake-API:er som stöds.

Filerna i _change_data mappen följer kvarhållningsprincipen för tabellen. Ändra dataflödesdata tas bort när VACUUM kommandot körs.

Kan jag använda ändringsdataflöde för att spela upp hela historiken för en tabell?

Ändringsdataflöde är inte avsett att fungera som en permanent post för alla ändringar i en tabell. Ändra dataflöde registrerar endast ändringar som inträffar när det har aktiverats.

Med Ändra dataflöde och Delta Lake kan du alltid rekonstruera en fullständig ögonblicksbild av en källtabell, vilket innebär att du kan starta en ny direktuppspelningsläsning mot en tabell med ändringsdataflöde aktiverat och avbilda den aktuella versionen av tabellen och alla ändringar som sker efter.

Du måste behandla poster i ändringsdataflödet som tillfälliga och endast tillgängliga för ett angivet kvarhållningsfönster. Delta-transaktionsloggen tar bort tabellversioner och deras motsvarande ändringsdataflödesversioner med jämna mellanrum. När en version tas bort från transaktionsloggen kan du inte längre läsa ändringsdataflödet för den versionen.

Om ditt användningsfall kräver en permanent historik över alla ändringar i en tabell bör du använda inkrementell logik för att skriva poster från ändringsdataflödet till en ny tabell. Följande kodexempel visar hur du använder trigger.AvailableNow, som utnyttjar inkrementell bearbetning av strukturerad direktuppspelning men bearbetar tillgängliga data som en batcharbetsbelastning. Du kan schemalägga den här arbetsbelastningen asynkront med dina huvudsakliga bearbetningspipelines för att skapa en säkerhetskopia av ändringsdataflödet för granskningsändamål eller fullständig återspelning.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Ändra dataflödesbegränsningar för tabeller med kolumnmappning aktiverat

Med kolumnmappning aktiverat i en Delta-tabell kan du släppa eller byta namn på kolumner i tabellen utan att skriva om datafiler för befintliga data. När kolumnmappningen är aktiverad har ändringsdataflödet begränsningar efter att ha utfört icke-additiva schemaändringar som att byta namn på eller släppa en kolumn, ändra datatyp eller nullabilitetsändringar.

Viktigt!

  • Du kan inte läsa ändringsdataflöde för en transaktion eller ett intervall där en icke-additiv schemaändring sker med hjälp av batch-semantik.
  • I Databricks Runtime 12.2 LTS och nedan har tabeller med kolumnmappning aktiverats som har upplevt icke-additiva schemaändringar inte stöd för strömmande läsningar i ändringsdataflöde. Mer information finns i Strömning med kolumnmappning och schemaändringar.
  • I Databricks Runtime 11.3 LTS och nedan kan du inte läsa ändra dataflöde för tabeller med kolumnmappning aktiverat som har fått kolumnbyte eller borttagning.

I Databricks Runtime 12.2 LTS och senare kan du utföra batchläsningar på ändringsdataflöde för tabeller med kolumnmappning aktiverat som har upplevt icke-additiva schemaändringar. I stället för att använda schemat för den senaste versionen av tabellen använder läsåtgärder schemat för slutversionen av tabellen som anges i frågan. Frågor misslyckas fortfarande om det angivna versionsintervallet sträcker sig över en icke-additiv schemaändring.