Weitere Informationen finden Sie unter APPLY CHANGES-API: Vereinfachtes CDC (Change Data Capture) in Delta Live Tables
Delta Live Tables vereinfacht die Change Data Capture (CDC) mit den APPLY CHANGES
und APPLY CHANGES FROM SNAPSHOT
-APIs. Die verwendete Schnittstelle hängt von der Quelle der Änderungsdaten ab:
- Verwenden Sie
APPLY CHANGES
, um Änderungen aus einem Änderungsdatenfeed (CDF) zu verarbeiten. - Verwenden Sie
APPLY CHANGES FROM SNAPSHOT
(öffentliche Vorschau) zum Verarbeiten von Änderungen in Datenbankmomentaufnahmen.
Zuvor wurde die MERGE INTO
-Anweisung üblicherweise für die Verarbeitung von CDC-Datensätzen auf Azure Databricks verwendet. MERGE INTO
kann jedoch falsche Ergebnisse aufgrund von Out-of-Sequence-Datensätzen erzeugen oder komplexe Logik zum erneuten Anordnen von Datensätzen erfordern.
Die APPLY CHANGES
-API wird in den SQL- und Python-Schnittstellen Delta Live Tables unterstützt. Die APPLY CHANGES FROM SNAPSHOT
-API wird in der Python-Schnittstelle Delta Live Tables unterstützt.
Sowohl APPLY CHANGES
als auch APPLY CHANGES FROM SNAPSHOT
unterstützen das Aktualisieren von Tabellen mit SCD-Typ 1 und Typ 2:
- Verwenden Sie SCD-Typ 1, um Datensätze direkt zu aktualisieren. Der Verlauf für aktualisierte Datensätze wird nicht aufbewahrt.
- Verwenden Sie SCD-Typ 2, um einen Verlauf von Datensätzen beizubehalten, entweder für alle Updates, oder für Updates für einen angegebenen Satz von Spalten.
Syntax und andere Verweise finden Sie unter:
- Ändern der Datenerfassung aus einem Änderungsfeed mit Python in Delta Live Tables
- Change Data Capture mit SQL in Delta Live Tables
Hinweis
In diesem Artikel wird beschrieben, wie Tabellen in Ihrer Delta Live Tables-Pipeline basierend auf Änderungen in Quelldaten aktualisiert werden. Informationen zum Aufzeichnen und Abfragen von Änderungsinformationen auf Zeilenebene für Delta-Tabellen finden Sie unter Verwenden Sie den Delta Lake-Änderungs-Datenfeed in Azure Databricks.
Anforderungen
Um die CDC-APIs zu verwenden, muss Ihre Pipeline so konfiguriert sein, dass serverlose DLT-Pipelines oder die Delta Live Tables Pro
- oder Advanced
-Editionen verwendet werden.
Wie wird CDC mit der APPLY CHANGES
-API implementiert?
Durch die automatische Verarbeitung von Out-of-Sequence-Datensätzen stellt die APPLY CHANGES
-API in Delta-Live Tables die korrekte Verarbeitung von CDC-Datensätzen sicher und entfernt die Notwendigkeit, komplexe Logik für die Behandlung von Out-of-Sequence-Datensätzen zu entwickeln. Sie müssen eine Spalte in den Quelldaten angeben, für die Datensätze sequenziert werden sollen, die Delta Live Tables als monoton zunehmende Darstellung der richtigen Reihenfolge der Quelldaten interpretiert. Delta Live Tables verarbeitet automatisch Daten, die außerhalb der Reihenfolge ankommen. Für SCD-Typ 2-Änderungen verteilt Delta Live Tables die entsprechenden Sequenzierungswerte an dien Spalten __START_AT
und __END_AT
der Zieltabelle. Für jeden Sequenzierungswert sollte ein eindeutiges Update pro Schlüssel vorhanden sein, und NULL-Sequenzierungswerte werden nicht unterstützt.
Zum Ausführen der CDC-Verarbeitung mit APPLY CHANGES
erstellen Sie zuerst eine Streamingtabelle und verwenden dann die APPLY CHANGES INTO
-Anweisung in SQL oder die apply_changes()
-Funktion in Python, um die Quelle, Schlüssel und Sequenzierung für den Änderungsfeed anzugeben. Verwenden Sie zum Erstellen der Zielstreamingtabelle die CREATE OR REFRESH STREAMING TABLE
-Anweisung in SQL oder die create_streaming_table()
-Funktion in Python. Weitere Informationen finden Sie in den Beispielen SCD Typ 1 und Typ 2 für die Verarbeitung.
Ausführliche Informationen zur Syntax finden Sie in der Delta Live Tables SQL-Referenz oder Python-Referenz.
Wie wird CDC mit der APPLY CHANGES FROM SNAPSHOT
-API implementiert?
Wichtig
Die APPLY CHANGES FROM SNAPSHOT
-API befindet sich in der öffentlichen Vorschau.
APPLY CHANGES FROM SNAPSHOT
ist eine deklarative API, mit der Änderungen an Quelldaten effizient ermittelt werden, indem eine Reihe von Momentaufnahmen in Reihenfolge verglichen und dann die für die CDC-Verarbeitung der Datensätze in den Momentaufnahmen erforderliche Verarbeitung ausgeführt wird. APPLY CHANGES FROM SNAPSHOT
wird nur von der Python-Schnittstelle Delta Live Tables unterstützt.
APPLY CHANGES FROM SNAPSHOT
unterstützt das Aufnehmen von Momentaufnahmen aus mehreren Quelltypen:
- Verwenden Sie die regelmäßige Erfassung von Momentaufnahmen, um Momentaufnahmen aus einer vorhandenen Tabelle oder Ansicht aufzunehmen.
APPLY CHANGES FROM SNAPSHOT
verfügt über eine einfache, optimierte Schnittstelle, die das regelmäßige Aufnehmen von Momentaufnahmen aus einem vorhandenen Datenbankobjekt unterstützt. Eine neue Momentaufnahme wird mit jedem Pipelineupdate aufgenommen, und die Aufnahmezeit wird als Momentaufnahmeversion verwendet. Wenn eine Pipeline im fortlaufenden Modus ausgeführt wird, werden mehrere Momentaufnahmen mit jeder Pipelineaktualisierung in einem Zeitraum aufgenommen, der durch die Triggerintervall-Einstellung für den Fluss bestimmt wird, der die APPLY CHANGES FROM SNAPSHOT-Verarbeitung enthält. - Verwenden Sie historische Momentaufnahmen zum Verarbeiten von Dateien, die Datenbankmomentaufnahmen enthalten, z. B. Momentaufnahmen, die aus einer Oracle- oder MySQL-Datenbank oder einem Data Warehouse generiert wurden.
Zum Ausführen der CDC-Verarbeitung von einem beliebigen Quelltyp mit APPLY CHANGES FROM SNAPSHOT
erstellen Sie zuerst eine Streamingtabelle und verwenden dann die apply_changes_from_snapshot()
-Funktion in Python, um die Momentaufnahme, Schlüssel und andere Argumente anzugeben, die zum Implementieren der Verarbeitung erforderlich sind. Sehen Sie sich die Beispiele für die regelmäßige Aufnahme von Momentaufnahmen und historische Aufnahme von Momentaufnahmen an.
Die Momentaufnahmen, die an die API übergeben werden, müssen in aufsteigender Reihenfolge nach Version sein. Wenn Delta Live Tables eine Momentaufnahme aus der Reihenfolge erkennt, wird ein Fehler ausgelöst.
Ausführliche Informationen zur Syntax finden Sie in der Python-Referenz zu Delta Live Tables.
Begrenzungen
Die für die Sequenzierung verwendete Spalte muss ein sortierbarer Datentyp sein.
Beispiel: SCD-Typ 1- und SCD-Typ 2-Verarbeitung mit CDF-Quelldaten
Die folgenden Abschnitte enthalten Beispiele für Delta Live Tables SCD-Typ 1 und Typ 2-Abfragen, die Zieltabellen basierend auf Quellereignissen aus einem Änderungsdatenfeed aktualisieren:
- Erstellt neue Benutzerdatensätze.
- Löscht einen Benutzerdatensatz.
- Aktualisiert Benutzerdatensätze. In dem Beispiel vom SCD-Typ 1 kommen die letzten
UPDATE
-Vorgänge zu spät an und werden aus der Zieltabelle gelöscht, um die Behandlung von Ereignissen in nicht ordnungsgemäßer Reihenfolge zu veranschaulichen.
In den folgenden Beispielen wird davon ausgegangen, dass Sie mit der Konfiguration und Aktualisierung von Delta Live Tables-Pipelines vertraut sind. Siehe Tutorial: Ausführen Ihrer ersten Delta Live Tables-Pipeline.
Zum Ausführen dieser Beispiele müssen Sie zunächst ein Beispiel-Dataset erstellen. Siehe Generieren von Testdaten.
Im Folgenden sind die Eingabedatensätze für diese Beispiele aufgeführt:
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | NULL | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Wenn Sie die letzte Zeile in den Beispieldaten auskommentieren, wird der folgende Datensatz eingefügt, der angibt, wo die Datensätze abgeschnitten werden sollen:
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
null | NULL | null | TRUNCATE | 3 |
Hinweis
Alle folgenden Beispiele enthalten Optionen zum Angeben von DELETE
- und TRUNCATE
-Vorgängen, die jedoch optional sind.
Aktualisierungen des SCD-Typs 1 verarbeiten
Im folgenden Beispiel wird die Verarbeitung von SCD-Typ 1-Updates veranschaulicht:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Nach dem Ausführen des Beispiels vom SCD-Typ 1 enthält die Zieltabelle die folgenden Datensätze:
userId | name | city |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
Nach der Ausführung des SCD-Typ 1-Beispiels mit dem zusätzlichen TRUNCATE
-Datensatz werden die Datensätze 124
und 126
aufgrund des TRUNCATE
-Vorgangs bei sequenceNum=3
abgeschnitten, und die Zieltabelle enthält den folgenden Datensatz:
userId | name | city |
---|---|---|
125 | Mercedes | Guadalajara |
Aktualisierungen des SCD-Typs 2 verarbeiten
Im folgenden Codebeispiel wird die Verarbeitung von SCD-Typ 2-Updates veranschaulicht:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Nach dem Ausführen des SCD Typ 2-Beispiels enthält die Zieltabelle die folgenden Datensätze:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lily | Cancun | 2 | NULL |
Eine SCD-Typ 2-Abfrage kann auch eine Teilmenge der Ausgabespalten angeben, die im Hinblick auf den Verlauf in der Zieltabelle nachverfolgt werden sollen. Änderungen an anderen Spalten werden aktualisiert, statt dass neue Verlaufsdatensätze generiert werden. Im folgenden Beispiel wird veranschaulicht, dass die Spalte city
von der Nachverfolgung ausgeschlossen wird:
Im folgenden Beispiel wird die Verwendung des Verlaufs mit SCD-Typ 2 veranschaulicht:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Nach dem Ausführen dieses Beispiels ohne den zusätzlichen TRUNCATE
-Datensatz enthält die Zieltabelle die folgenden Datensätze:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lily | Cancun | 2 | null |
Generieren von Testdaten
Der folgende Code wird bereitgestellt, um ein Beispiel-Dataset für die Verwendung in den Beispielabfragen zu generieren, die in diesem Lernprogramm vorhanden sind. Wenn Sie über die richtigen Anmeldeinformationen verfügen, um ein neues Schema zu erstellen und eine neue Tabelle zu erstellen, können Sie diese Anweisungen entweder mit einem Notebook oder mit Databricks SQL ausführen. Der folgende Code sollte nicht als Teil einer Delta Live Tables-Pipeline ausgeführt werden:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Beispiel: Regelmäßige Momentaufnahmeverarbeitung
Das folgende Beispiel veranschaulicht die SCD-Typ 2-Verarbeitung, die Momentaufnahmen einer Tabelle erfasst, die bei mycatalog.myschema.mytable
gespeichert wird. Die Ergebnisse der Verarbeitung werden in eine Tabelle mit dem Namen target
geschrieben.
mycatalog.myschema.mytable
erfasst zum Zeitstempel 2024-01-01 00:00:00
Schlüssel | Wert |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
erfasst zum Zeitstempel 2024-01-01 12:00:00
Schlüssel | Wert |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Nach der Verarbeitung der Momentaufnahmen enthält die Zieltabelle die folgenden Datensätze:
Schlüssel | Wert | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | NULL |
3 | a3 | 2024-01-01 12:00:00 | NULL |
Beispiel: Verarbeitung von historischen Momentaufnahmen
Das folgende Beispiel veranschaulicht die SCD-Typ 2-Verarbeitung, die eine Zieltabelle basierend auf Quellereignissen aus zwei Momentaufnahmen aktualisiert, die in einem Cloudspeichersystem gespeichert sind:
Momentaufnahme bei timestamp
, gespeichert in /<PATH>/filename1.csv
Schlüssel | TrackingColumn | NonTrackingColumn |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
Momentaufnahme bei timestamp + 5
, gespeichert in /<PATH>/filename2.csv
Schlüssel | TrackingColumn | NonTrackingColumn |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
Im folgenden Codebeispiel wird die Verarbeitung von SCD-Typ 2-Updates mit diesen Momentaufnahmen veranschaulicht:
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
Nach der Verarbeitung der Momentaufnahmen enthält die Zieltabelle die folgenden Datensätze:
Schlüssel | TrackingColumn | NonTrackingColumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | NULL |
3 | a3 | b3 | 2 | NULL |
4 | a4 | b4_new | 1 | NULL |
Hinzufügen, Ändern oder Löschen von Daten in einer Zielstreamingtabelle
Wenn Ihre Pipeline Tabellen im Unity Catalog veröffentlicht, können Sie Datenbearbeitungssprache (DML)-Anweisungen verwenden, einschließlich Einfüge-, Aktualisierungs-, Lösch- und Zusammenführungsanweisungen, um die von APPLY CHANGES INTO
-Anweisungen erstellten Zielstreamingtabellen zu ändern.
Hinweis
- DML-Anweisungen, die das Tabellenschema einer Streamingtabelle ändern, werden nicht unterstützt. Stellen Sie sicher, dass Ihre DML-Anweisungen nicht versuchen, das Tabellenschema weiterzuentwickeln.
- DML-Anweisungen, die eine Streamingtabelle aktualisieren, können nur in einem freigegebenen Unity Catalog-Cluster oder einem SQL-Warehouse mit Databricks Runtime 13.3 LTS und höher ausgeführt werden.
- Da für das Streaming Datenquellen im Nur-Anfügen-Modus benötigt werden, setzen Sie das Flag skipChangeCommits beim Lesen der Streaming-Quelltabelle, wenn Ihre Verarbeitung Streaming aus einer Streaming-Quelltabelle mit Änderungen (z. B. durch DML-Anweisungen) erfordert. Wenn
skipChangeCommits
festgelegt ist, werden Transaktionen, die Datensätze in der Quelltabelle löschen oder ändern, ignoriert. Wenn Ihre Verarbeitung keine Streaming-Tabelle erfordert, können Sie eine materialisierte Ansicht (die nicht die Einschränkung „nur Anhängen“ hat) als Zieltabelle verwenden.
Da Delta Live Tables eine angegebene SEQUENCE BY
-Spalte verwendet und geeignete Sequenzierungswerte an die __START_AT
- und __END_AT
-Spalten der Zieltabelle (für SCD-Typ 2) verteilt, müssen Sie sicherstellen, dass DML-Anweisungen gültige Werte für diese Spalten verwenden, um die richtige Reihenfolge von Datensätzen aufrechtzuerhalten. Lesen Sie Wie wird CDC mit der APPLY CHANGES-API implementiert?.
Weitere Informationen zum Verwenden von DML-Anweisungen mit Streamingtabellen finden Sie unter Hinzufügen, Ändern oder Löschen von Daten in einer Streamingtabelle.
Im folgenden Beispiel wird ein aktiver Datensatz mit einer Startsequenz von 5 eingefügt:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Lesen eines Änderungsdatenfeeds aus einer APPLY CHANGES
-Zieltabelle
In Databricks Runtime 15.2 und höher können Sie einen Änderungsdatenfeed aus einer Streamingtabelle, die das Ziel von APPLY CHANGES
- oder APPLY CHANGES FROM SNAPSHOT
-Abfragen ist, auf die gleiche Weise lesen, wie Sie einen Änderungsdatenfeed aus anderen Delta-Tabellen lesen. Nachfolgend finden Sie Informationen zum Lesen des Änderungsdatenfeeds aus einer Zielstreamingtabelle:
- Die Zielstreamingtabelle muss in Unity Catalog veröffentlicht werden. Weitere Informationen finden Sie unter Verwenden von Unity Catalog mit Ihren Delta Live Tables-Pipelines.
- Um den Änderungsdatenfeed aus der Zielstreamingtabelle zu lesen, müssen Sie Databricks Runtime 15.2 oder höher verwenden. Um den Änderungsdatenfeed in einer anderen Delta Live Tables-Pipeline zu lesen, muss die Pipeline für die Verwendung von Databricks Runtime 15.2 oder höher konfiguriert werden.
Das Lesen des Änderungsdatenfeeds aus einer Zielstreamingtabelle, die in einer Delta Live Tables-Pipeline erstellt wurde, funktioniert genauso wie das Lesen eines Änderungsdatenfeeds aus anderen Delta-Tabellen. Weitere Informationen zur Verwendung der Delta-Änderungsdatenfeedfunktionen, einschließlich Beispielen in Python und SQL, finden Sie unter Verwenden des Delta Lake-Änderungsdatenfeeds in Azure Databricks.
Hinweis
Der Datensatz für Änderungsdatenfeed enthält Metadaten, die den Typ des Änderungsereignisses identifizieren. Wenn ein Datensatz in einer Tabelle aktualisiert wird, enthalten die Metadaten für die zugehörigen Änderungsdatensätze in der Regel _change_type
-Werte, die auf update_preimage
- und update_postimage
-Ereignisse festgelegt sind.
Die _change_type
-Werte unterscheiden sich jedoch, wenn Aktualisierungen an der Zielstreamingtabelle vorgenommen werden, die Änderungen der Primärschlüsselwerte umfassen. Wenn Änderungen Aktualisierungen an Primärschlüsseln enthalten, werden die _change_type
-Metadatenfelder auf insert
- und delete
-Ereignisse festgelegt. Änderungen an Primärschlüsseln können auftreten, wenn manuelle Aktualisierungen an einem der Schlüsselfelder mit einer UPDATE
- oder MERGE
-Anweisung oder für SCD-Typ 2-Tabellen vorgenommen werden, wenn sich das __start_at
-Feld ändert, um einen früheren Startsequenzwert widerzuspiegeln.
Die APPLY CHANGES
-Abfrage bestimmt die Primärschlüsselwerte, die sich für die SCD-Typ 1- und SCD-Typ 2-Verarbeitung unterscheiden:
- Bei der SCD-Typ 1-Verarbeitung und der Delta Live Tables-Schnittstelle für Python ist der Primärschlüssel der Wert des
keys
-Parameters in derapply_changes()
-Funktion. Für die Delta Live Tables-Schnittstelle für SQL ist der Primärschlüssel die Spalten, die durch dieKEYS
-Klausel in derAPPLY CHANGES INTO
-Anweisung definiert werden. - Bei SCD-Typ 2 ist der Primärschlüssel der
keys
-Parameter oder dieKEYS
-Klausel sowie der Rückgabewert descoalesce(__START_AT, __END_AT)
-Vorgangs, wobei__START_AT
und__END_AT
die entsprechenden Spalten aus der Zielstreamingtabelle sind.
Abrufen von Daten zu Datensätzen, die von einer Delta Live Tables-CDC-Abfrage verarbeitet werden
Hinweis
Die folgenden Metriken werden nur von APPLY CHANGES
-Abfragen und nicht von APPLY CHANGES FROM SNAPSHOT
-Abfragen erfasst.
Die folgenden Metriken werden von APPLY CHANGES
-Abfragen erfasst:
num_upserted_rows
: Die Anzahl der Ausgabezeilen, für die während einer Aktualisierung ein Upsertvorgang im Dataset ausgeführt wird.num_deleted_rows
: Die Anzahl vorhandener Ausgabezeilen, die während einer Aktualisierung aus dem Dataset gelöscht wurden.
Die Metrik num_output_rows
, die für Nicht-CDC-Flows ausgegeben wird, wird nicht für apply changes
-Abfragen erfasst.
Welche Datenobjekte werden für die CDC-Verarbeitung von Delta Live Tables verwendet?
Hinweis: Die folgenden Datenstrukturen gelten nur für APPLY CHANGES
-Verarbeitung, nicht für APPLY CHANGES FROM SNAPSHOT
-Verarbeitung.
Wenn Sie die Zieltabelle im Hive-Metastore deklarieren, werden zwei Datenstrukturen erstellt:
- Eine Ansicht mit dem Namen, der der Zieltabelle zugewiesen ist.
- Eine interne Sicherungstabelle, die von Delta Live Tables zum Verwalten der CDC-Verarbeitung verwendet wird. Diese Tabelle wird benannt, indem dem Namen der Zieltabelle
__apply_changes_storage_
vorangestellt wird.
Wenn Sie beispielsweise eine Zieltabelle mit dem Namen dlt_cdc_target
deklarieren, wird eine Ansicht mit dem Namen dlt_cdc_target
und eine Tabelle mit dem Namen __apply_changes_storage_dlt_cdc_target
im Metastore angezeigt. Durch das Erstellen einer Ansicht können Delta Live Tables die zusätzlichen Informationen (z. B. Tombstones und Versionen) herausfiltern, die für die Verarbeitung von nicht ordnungsgemäßen Daten erforderlich sind. Um die verarbeiteten Daten anzuzeigen, fragen Sie die Zielansicht ab. Da sich das Schema der __apply_changes_storage_
-Tabelle möglicherweise ändert, um zukünftige Features oder Verbesserungen zu unterstützen, sollten Sie die Tabelle nicht für die Produktionsverwendung abfragen. Wenn Sie der Tabelle Daten manuell hinzufügen, werden die Datensätze angenommen, bevor andere Änderungen vorgenommen werden, da die Versionsspalten fehlen.
Wenn eine Pipeline im Unity Catalog veröffentlicht wird, sind die internen Sicherungstabellen für Benutzer nicht zugänglich.