Verwenden des Delta Lake-Tabellenverlaufs
Jeder Vorgang, der eine Delta Lake-Tabelle ändert, hat die Erstellung einer neuen Tabellenversion zur Folge. Sie können Verlaufsinformationen nutzen, um Vorgänge zu überwachen, ein Rollback für eine Tabelle durchzuführen oder eine Tabelle zu einem bestimmten Zeitpunkt mittels Zeitreise abzufragen.
Hinweis
Databricks empfiehlt nicht, den Delta Lake-Tabellenverlauf als langfristige Sicherungslösung für die Datenarchivierung zu verwenden. Databricks empfiehlt, nur die letzten 7 Tage für Zeitreisevorgänge zu verwenden, es sei denn, Sie haben sowohl die Daten- als auch die Protokollaufbewahrungskonfigurationen auf einen größeren Wert festgelegt.
Abrufen des Delta-Tabellenverlaufs
Sie können Informationen zu den Vorgängen, zum Benutzer und zum Zeitstempel für jeden Schreibvorgang in eine Delta-Tabelle abrufen, indem Sie den history
-Befehl ausführen. Die Vorgänge werden in umgekehrter chronologischer Reihenfolge zurückgegeben.
Die Beibehaltung des Tabellenverlaufs wird durch die Tabelleneinstellung delta.logRetentionDuration
bestimmt, die standardmäßig auf 30 Tage festgelegt ist.
Hinweis
Zeitreisen und Tabellenverlauf werden durch unterschiedliche Aufbewahrungsschwellenwerte gesteuert. Weitere Informationen finden Sie unter Was ist die Delta Lake-Zeitreise?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Details zur Spark SQL-Syntax finden Sie unter DESCRIBE HISTORY.
Details zur Syntax von Scala, Java und Python finden Sie in der Dokumentation zur Delta Lake-API.
Der Katalog-Explorer bietet eine visuelle Darstellung dieser detaillierten Tabelleninformationen und des Verlaufs für Deltatabellen. Sie können auf die Registerkarte Verlauf klicken, um zusätzlich zu Tabellenschema und Beispieldaten den mit DESCRIBE HISTORY
angezeigten Tabellenverlauf zu sehen.
Verlaufsschema
Die Ausgabe des Vorgangs history
enthält die folgenden Spalten.
Column | Type | BESCHREIBUNG |
---|---|---|
version | lang | Die vom Vorgang generierte Tabellenversion. |
timestamp | timestamp | Zeitpunkt, zu dem diese Version committet wurde. |
userId | Zeichenfolge | ID des Benutzers, der den Vorgang ausgeführt hat. |
userName | Zeichenfolge | Name des Benutzers, der den Vorgang ausgeführt hat. |
operation | Zeichenfolge | Name des Vorgangs. |
operationParameters | map | Parameter des Vorgangs (z. B. Prädikate) |
Auftrag | struct | Details des Auftrags, der den Vorgang ausgeführt hat. |
Notebook | struct | Details des Notebooks, aus dem der Vorgang ausgeführt wurde. |
clusterId | Zeichenfolge | ID des Clusters, auf dem der Vorgang ausgeführt wurde. |
readVersion | lang | Version der Tabelle, die gelesen wurde, um den Schreibvorgang auszuführen. |
isolationLevel | Zeichenfolge | Für diesen Vorgang verwendete Isolationsstufe. |
isBlindAppend | boolean | Gibt an, ob dieser Vorgang Daten angefügt hat. |
operationMetrics | map | Metriken des Vorgangs (z. B. Anzahl von geänderten Zeilen und Dateien) |
UserMetadata | Zeichenfolge | Benutzerdefinierte Commit-Metadaten, wenn sie angegeben wurden |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Hinweis
- Einige der anderen Spalten sind nicht verfügbar, wenn Sie mithilfe der folgenden Methoden in eine Delta-Tabelle schreiben:
- Zukünftig hinzugefügte Spalten werden immer hinter der letzten Spalte hinzugefügt.
Vorgangsmetrikschlüssel
Der Vorgang history
gibt eine Auflistung von Vorgangsmetriken in der Spaltenzuordnung operationMetrics
zurück.
In den folgenden Tabellen sind die Zuordnungsschlüsseldefinitionen nach Vorgang aufgeführt.
Vorgang | Metrikname | BESCHREIBUNG |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO | ||
numFiles | Anzahl von geschriebenen Dateien. | |
numOutputBytes | Größe der geschriebenen Inhalte in Bytes. | |
numOutputRows | Anzahl von geschriebenen Zeilen. | |
STREAMING UPDATE | ||
numAddedFiles | Anzahl von hinzugefügten Dateien. | |
numRemovedFiles | Anzahl von entfernten Dateien. | |
numOutputRows | Anzahl von geschriebenen Zeilen. | |
numOutputBytes | Größe des Schreibzugriffs in Bytes. | |
DELETE | ||
numAddedFiles | Anzahl von hinzugefügten Dateien. Wird nicht angegeben, wenn Partitionen der Tabelle gelöscht werden. | |
numRemovedFiles | Anzahl von entfernten Dateien. | |
numDeletedRows | Anzahl von entfernten Zeilen. Wird nicht angegeben, wenn Partitionen der Tabelle gelöscht werden. | |
numCopiedRows | Anzahl der beim Löschen von Dateien kopierten Zeilen. | |
executionTimeMs | Benötigte Zeit für die Ausführung des gesamten Vorgangs. | |
scanTimeMs | Benötigte Zeit zum Überprüfen der Dateien auf Übereinstimmungen. | |
rewriteTimeMs | Benötigte Zeit zum erneuten Schreiben der übereinstimmenden Dateien. | |
TRUNCATE | ||
numRemovedFiles | Anzahl von entfernten Dateien. | |
executionTimeMs | Benötigte Zeit für die Ausführung des gesamten Vorgangs. | |
MERGE | ||
numSourceRows | Anzahl von Zeilen im Quelldatenrahmen (Quell-DataFrame). | |
numTargetRowsInserted | Anzahl der in die Zieltabelle eingefügten Zeilen. | |
numTargetRowsUpdated | Anzahl der in der Zieltabelle aktualisierten Zeilen. | |
numTargetRowsDeleted | Anzahl der in der Zieltabelle gelöschten Zeilen. | |
numTargetRowsCopied | Anzahl von kopierten Zielzeilen. | |
numOutputRows | Gesamtanzahl von ausgeschriebenen Zeilen. | |
numTargetFilesAdded | Anzahl von Dateien, die der Senke (Ziel) hinzugefügt wurden. | |
numTargetFilesRemoved | Anzahl von Dateien, die aus der Senke (Ziel) entfernt wurden. | |
executionTimeMs | Benötigte Zeit für die Ausführung des gesamten Vorgangs. | |
scanTimeMs | Benötigte Zeit zum Überprüfen der Dateien auf Übereinstimmungen. | |
rewriteTimeMs | Benötigte Zeit zum erneuten Schreiben der übereinstimmenden Dateien. | |
UPDATE | ||
numAddedFiles | Anzahl von hinzugefügten Dateien. | |
numRemovedFiles | Anzahl von entfernten Dateien. | |
numUpdatedRows | Anzahl von aktualisierten Zeilen. | |
numCopiedRows | Anzahl von Zeilen, die beim Aktualisieren von Dateien gerade kopiert wurden. | |
executionTimeMs | Benötigte Zeit für die Ausführung des gesamten Vorgangs. | |
scanTimeMs | Benötigte Zeit zum Überprüfen der Dateien auf Übereinstimmungen. | |
rewriteTimeMs | Benötigte Zeit zum erneuten Schreiben der übereinstimmenden Dateien. | |
FSCK | numRemovedFiles | Anzahl von entfernten Dateien. |
CONVERT | numConvertedFiles | Anzahl von Parquet-Dateien, die konvertiert wurden. |
OPTIMIZE | ||
numAddedFiles | Anzahl von hinzugefügten Dateien. | |
numRemovedFiles | Anzahl von optimierten Dateien. | |
numAddedBytes | Anzahl von Bytes, die nach Optimierung der Tabelle hinzugefügt wurden. | |
numRemovedBytes | Anzahl von entfernten Bytes. | |
MinFileSize | Größe der kleinsten Datei, nachdem die Tabelle optimiert wurde. | |
p25FileSize | Größe der 25. Perzentildatei, nachdem die Tabelle optimiert wurde. | |
p50FileSize | Mediandateigröße, nachdem die Tabelle optimiert wurde. | |
p75FileSize | Größe der 75. Perzentildatei, nachdem die Tabelle optimiert wurde. | |
maxFileSize | Größe der größten Datei, nachdem die Tabelle optimiert wurde. | |
CLONE | ||
sourceTableSize | Größe in Bytes der Quelltabelle in der geklonten Version. | |
sourceNumOfFiles | Anzahl von Dateien in der Quelltabelle in der geklonten Version. | |
numRemovedFiles | Anzahl von Dateien, die aus der Zieltabelle entfernt wurden, wenn eine vorherige Delta-Tabelle ersetzt wurde. | |
removedFilesSize | Gesamtgröße in Bytes der Dateien, die aus der Zieltabelle entfernt wurden, wenn eine vorherige Delta-Tabelle ersetzt wurde. | |
numCopiedFiles | Anzahl von Dateien, die an den neuen Speicherort kopiert wurden. „0“ für flache Klone. | |
copiedFilesSize | Gesamtgröße in Bytes der Dateien, die an den neuen Speicherort kopiert wurden. „0“ für flache Klone. | |
RESTORE | ||
tableSizeAfterRestore | Tabellengröße in Bytes nach der Wiederherstellung. | |
numOfFilesAfterRestore | Anzahl von Dateien in der Tabelle nach der Wiederherstellung. | |
numRemovedFiles | Anzahl von Dateien, die durch den Wiederherstellungsvorgang entfernt wurden. | |
numRestoredFiles | Anzahl von Dateien, die als Ergebnis der Wiederherstellung hinzugefügt wurden. | |
removedFilesSize | Größe in Bytes von Dateien, die durch die Wiederherstellung entfernt wurden. | |
restoredFilesSize | Größe in Bytes von Dateien, die durch die Wiederherstellung hinzugefügt wurden. | |
VACUUM | ||
numDeletedFiles | Anzahl von gelöschten Dateien. | |
numVacuumedDirectories | Anzahl von bereinigten Verzeichnissen. | |
numFilesToDelete | Anzahl von zu löschenden Dateien. |
Was ist die Delta Lake-Zeitreise?
Delta Lake unterstützt das Abfragen früherer Tabellenversionen basierend auf dem Zeitstempel oder der Tabellenversion (gemäß Aufzeichnung im Transaktionsprotokoll). Sie können Zeitreisen für Anwendungen wie die folgenden verwenden:
- Erneutes Erstellen von Analysen, Berichten oder Ausgaben (z. B. der Ausgabe eines Machine Learning-Modells). Dies könnte für das Debuggen oder die Überwachung nützlich sein, insbesondere in regulierten Branchen.
- Schreiben komplexer temporaler Abfragen.
- Beheben von Fehlern in Ihren Daten.
- Bereitstellen der Momentaufnahmenisolation für eine Reihe von Abfragen für sich schnell verändernde Tabellen.
Wichtig
Tabellenversionen, auf die mit Zeitreisen zugegriffen werden kann, werden durch eine Kombination aus dem Aufbewahrungsschwellenwert für Transaktionsprotokolldateien und der Häufigkeit und der angegebenen Aufbewahrung für VACUUM
-Vorgänge bestimmt. Wenn Sie VACUUM
täglich mit den Standardwerten ausführen, stehen 7 Tage Daten für Zeitreisen zur Verfügung.
Syntax für die Delta-Zeitreise
Sie fragen eine Delta-Tabelle mit Zeitreise ab, indem Sie eine Klausel nach der Spezifikation des Tabellennamens hinzufügen.
timestamp_expression
kann einen der folgenden Werte annehmen:'2018-10-18T22:15:12.013Z'
, d. h. eine Zeichenfolge, die in einen Zeitstempel umgewandelt werden kanncast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, d. h. eine Datumszeichenfolgecurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Jeder andere Ausdruck, der ein Zeitstempel ist oder in einen Zeitstempel umgewandelt werden kann
version
ist ein LONG-Wert, der aus der Ausgabe vonDESCRIBE HISTORY table_spec
abgerufen werden kann.
Weder timestamp_expression
noch version
können Unterabfragen sein.
Es werden nur Datums- oder Zeitstempelzeichenfolgen akzeptiert. Beispiel: "2019-01-01"
und "2019-01-01T00:00:00.000Z"
. Eine Beispielsyntax finden Sie im folgenden Code:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
Sie können auch die Syntax @
verwenden, um den Zeitstempel oder die Version als Teil des Tabellennamens anzugeben. Der Zeitstempel muss im Format yyyyMMddHHmmssSSS
vorliegen. Sie können eine Version nach @
angeben, indem Sie der Version v
voranstellen. Eine Beispielsyntax finden Sie im folgenden Code:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
Was sind Transaktionsprotokoll-Prüfpunkte?
Delta Lake erfasst Tabellenversionen als JSON-Dateien im Verzeichnis _delta_log
, das parallel zu Tabellendaten gespeichert wird. Zur Optimierung von Prüfpunktabfragen aggregiert Delta Lake Tabellenversionen in Parquet-Prüfpunktdateien, damit nicht alle JSON-Versionen des Tabellenverlaufs gelesen werden müssen. Azure Databricks optimiert die Prüfpunkthäufigkeit für Datengröße und Workload. Benutzer sollten nicht direkt mit Prüfpunkten interagieren müssen. Die Prüfpunkthäufigkeit kann sich ohne Ankündigung ändern.
Konfigurieren der Datenaufbewahrung für Zeitreiseabfragen
Um eine frühere Tabellenversion abzufragen, müssen Sie sowohl die Protokoll- als auch die Datendateien für diese Version beibehalten.
Datendateien werden gelöscht, wenn VACUUM
gegen eine Tabelle ausgeführt wird. Delta Lake verwaltet das automatische Entfernen von Protokolldateien nach dem Erstellen eines Prüfpunkts der Tabellenversionen.
Da gegen meisten Delta-Tabellen regelmäßig VACUUM
ausgeführt wird, sollten Point-in-Time-Abfragen den Aufbewahrungsschwellenwert für VACUUM
berücksichtigen, der standardmäßig 7 Tage beträgt.
Um den Schwellenwert für die Datenaufbewahrung für Delta-Tabellen zu erhöhen, müssen Sie die folgenden Tabelleneigenschaften konfigurieren:
delta.logRetentionDuration = "interval <interval>"
: Steuert, wie lange der Verlauf einer Tabelle aufbewahrt wird. Der Standardwert lautetinterval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: bestimmt den Schwellenwert, denVACUUM
verwendet, um Datendateien zu entfernen, auf die in der aktuellen Tabellenversion nicht mehr verwiesen wird. Der Standardwert lautetinterval 7 days
.
Sie können Delta-Eigenschaften während der Tabellenerstellung angeben oder mit einer ALTER TABLE
-Anweisung festlegen. Weitere Informationen finden Sie unter Referenz zu Delta-Tabelleneigenschaften.
Hinweis
Sie müssen beide Eigenschaften festlegen, um sicherzustellen, dass der Tabellenverlauf für Tabellen mit häufigen VACUUM
-Vorgängen länger beibehalten wird. Um beispielsweise auf historische Daten für 30 Tage zuzugreifen, legen Sie delta.deletedFileRetentionDuration = "interval 30 days"
fest (was der Standardeinstellung für delta.logRetentionDuration
entspricht).
Die Erhöhung des Schwellenwerts für die Datenaufbewahrung kann dazu führen, dass Ihre Speicherkosten steigen, da mehr Datendateien aufbewahrt werden.
Wiederherstellen eines früheren Zustands einer Delta-Tabelle
Mit dem Befehl RESTORE
können Sie den früheren Zustand einer Delta-Tabelle wiederherstellen. Eine Delta-Tabelle verwaltet intern ihre Verlaufsversionen, mit denen sie auf einen früheren Zustand wiederhergestellt werden kann.
Eine Version, die dem früheren Zustand entspricht, oder ein Zeitstempel von der Erstellung des früheren Zustands werden durch den Befehl RESTORE
als Optionen unterstützt.
Wichtig
- Sie können eine bereits wiederhergestellte Tabelle wiederherstellen.
- Sie können eine geklonte Tabelle wiederherstellen.
- Sie müssen
MODIFY
-Berechtigung für die zurzeit wiederhergestellte Tabelle haben. - Sie können eine Tabelle nicht auf eine ältere Version wiederherstellen, wenn die Datendateien manuell oder durch
vacuum
gelöscht wurden. Die teilweise Wiederherstellung auf diese Version ist weiterhin möglich, wennspark.sql.files.ignoreMissingFiles
auftrue
festgelegt wird. - Das Zeitstempelformat für die Wiederherstellung auf einen früheren Zustand lautet
yyyy-MM-dd HH:mm:ss
. Die Angabe nur einer „date(yyyy-MM-dd
)“-Zeichenfolge wird ebenfalls unterstützt.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Details zur Syntax finden Sie unter RESTORE.
Wichtig
Die Wiederherstellung wird als Datenänderungsvorgang betrachtet. Bei Delta Lake-Protokolleinträgen, die vom RESTORE
-Befehl hinzugefügt wurden, ist dataChange auf TRUE festgelegt. Wenn es eine nachgelagerte Anwendung gibt, z. B. einen strukturierten Streamingauftrag, der die Aktualisierungen einer Delta Lake-Tabelle verarbeitet, werden die vom Wiederherstellungsvorgang hinzugefügten Protokolleinträge zu Datenänderungen als neue Datenaktualisierungen betrachtet, und ihre Verarbeitung kann zu Datenduplikaten führen.
Beispiele:
Tabellenversion | Vorgang | Delta-Protokollaktualisierungen | Datensätze in Protokollaktualisierungen zu Datenänderungen |
---|---|---|---|
0 | INSERT | AddFile(/Pfad/zur/Datei-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
1 | INSERT | AddFile(/Pfad/zur/Datei-2, dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/Pfad/zur/Datei-3, dataChange = false), RemoveFile(/Pfad/zur/Datei-1), RemoveFile(/Pfad/zur/Datei-2) | (Wenn bei der Optimierungskomprimierung keine Datensätze verarbeitet werden, erfolgen keine Änderungen an den Daten in der Tabelle.) |
3 | RESTORE(version=1) | RemoveFile(/Pfad/zur/Datei-3), AddFile(/Pfad/zur/Datei-1, dataChange = true), AddFile(/Pfad/zur/Datei-2, dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
Im obigen Beispiel führt der RESTORE
-Befehl zu Aktualisierungen, die beim Lesen von Version 0 und 1 der Delta-Tabelle bereits sichtbar waren. Wenn diese Tabelle bei einer Streamingabfrage gelesen wird, werden diese Dateien als neu hinzugefügte Daten betrachtet und erneut verarbeitet.
Wiederherstellen von Metriken
RESTORE
meldet die folgenden Metriken als einzeiligen Datenrahmen (DataFrame), sobald der Vorgang abgeschlossen ist:
table_size_after_restore
: Die Größe der Tabelle nach der Wiederherstellung.num_of_files_after_restore
Die Anzahl von Dateien in der Tabelle nach der Wiederherstellung.num_removed_files
: Die Anzahl von Dateien, die aus der Tabelle entfernt (logisch gelöscht) wurden.num_restored_files
: Die Anzahl von Dateien, die aufgrund eines Rollbacks wiederhergestellt wurden.removed_files_size
: Die Gesamtgröße in Bytes der aus der Tabelle entfernten Dateien.restored_files_size
: Die Gesamtgröße in Bytes der wiederhergestellten Dateien.
Beispiele für die Verwendung von Delta Lake-Zeitreisen
Korrigieren versehentlicher Löschungen in einer Tabelle für den Benutzer
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Korrigieren versehentlicher falscher Updates für eine Tabelle:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Abfragen der Anzahl von Neukunden, die in der letzten Woche gewonnen wurden.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Wie finde ich die Version des letzten Commits in der Spark-Sitzung?
Um die Versionsnummer des letzten Commits zu erhalten, der von der aktuellen SparkSession
für alle Threads und alle Tabellen geschrieben wurde, fragen Sie die SQL-Konfiguration spark.databricks.delta.lastCommitVersionInSession
ab.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Wenn von der SparkSession
keine Commits durchgeführt wurden, wird beim Abfragen des Schlüssels ein leerer Wert zurückgegeben.
Hinweis
Wenn Sie dieselbe SparkSession
über mehrere Threads hinweg freigeben, ähnelt dies der Freigabe einer Variablen, ebenfalls über mehrere Threads hinweg. Möglicherweise kommt es zu Racebedingungen, da der Konfigurationswert gleichzeitig aktualisiert wird.