Verwenden des Delta Lake-Tabellenverlaufs

Jeder Vorgang, der eine Delta Lake-Tabelle ändert, hat die Erstellung einer neuen Tabellenversion zur Folge. Sie können Verlaufsinformationen nutzen, um Vorgänge zu überwachen, ein Rollback für eine Tabelle durchzuführen oder eine Tabelle zu einem bestimmten Zeitpunkt mittels Zeitreise abzufragen.

Hinweis

Databricks empfiehlt nicht, den Delta Lake-Tabellenverlauf als langfristige Sicherungslösung für die Datenarchivierung zu verwenden. Databricks empfiehlt, nur die letzten 7 Tage für Zeitreisevorgänge zu verwenden, es sei denn, Sie haben sowohl die Daten- als auch die Protokollaufbewahrungskonfigurationen auf einen größeren Wert festgelegt.

Abrufen des Delta-Tabellenverlaufs

Sie können Informationen zu den Vorgängen, zum Benutzer und zum Zeitstempel für jeden Schreibvorgang in eine Delta-Tabelle abrufen, indem Sie den history-Befehl ausführen. Die Vorgänge werden in umgekehrter chronologischer Reihenfolge zurückgegeben.

Die Beibehaltung des Tabellenverlaufs wird durch die Tabelleneinstellung delta.logRetentionDurationbestimmt, die standardmäßig auf 30 Tage festgelegt ist.

Hinweis

Zeitreisen und Tabellenverlauf werden durch unterschiedliche Aufbewahrungsschwellenwerte gesteuert. Weitere Informationen finden Sie unter Was ist die Delta Lake-Zeitreise?.

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

Details zur Spark SQL-Syntax finden Sie unter DESCRIBE HISTORY.

Details zur Syntax von Scala, Java und Python finden Sie in der Dokumentation zur Delta Lake-API.

Der Katalog-Explorer bietet eine visuelle Darstellung dieser detaillierten Tabelleninformationen und des Verlaufs für Deltatabellen. Sie können auf die Registerkarte Verlauf klicken, um zusätzlich zu Tabellenschema und Beispieldaten den mit DESCRIBE HISTORY angezeigten Tabellenverlauf zu sehen.

Verlaufsschema

Die Ausgabe des Vorgangs history enthält die folgenden Spalten.

Column Type BESCHREIBUNG
version lang Die vom Vorgang generierte Tabellenversion.
timestamp timestamp Zeitpunkt, zu dem diese Version committet wurde.
userId Zeichenfolge ID des Benutzers, der den Vorgang ausgeführt hat.
userName Zeichenfolge Name des Benutzers, der den Vorgang ausgeführt hat.
operation Zeichenfolge Name des Vorgangs.
operationParameters map Parameter des Vorgangs (z. B. Prädikate)
Auftrag struct Details des Auftrags, der den Vorgang ausgeführt hat.
Notebook struct Details des Notebooks, aus dem der Vorgang ausgeführt wurde.
clusterId Zeichenfolge ID des Clusters, auf dem der Vorgang ausgeführt wurde.
readVersion lang Version der Tabelle, die gelesen wurde, um den Schreibvorgang auszuführen.
isolationLevel Zeichenfolge Für diesen Vorgang verwendete Isolationsstufe.
isBlindAppend boolean Gibt an, ob dieser Vorgang Daten angefügt hat.
operationMetrics map Metriken des Vorgangs (z. B. Anzahl von geänderten Zeilen und Dateien)
UserMetadata Zeichenfolge Benutzerdefinierte Commit-Metadaten, wenn sie angegeben wurden
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Hinweis

Vorgangsmetrikschlüssel

Der Vorgang history gibt eine Auflistung von Vorgangsmetriken in der Spaltenzuordnung operationMetrics zurück.

In den folgenden Tabellen sind die Zuordnungsschlüsseldefinitionen nach Vorgang aufgeführt.

Vorgang Metrikname BESCHREIBUNG
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
numFiles Anzahl von geschriebenen Dateien.
numOutputBytes Größe der geschriebenen Inhalte in Bytes.
numOutputRows Anzahl von geschriebenen Zeilen.
STREAMING UPDATE
numAddedFiles Anzahl von hinzugefügten Dateien.
numRemovedFiles Anzahl von entfernten Dateien.
numOutputRows Anzahl von geschriebenen Zeilen.
numOutputBytes Größe des Schreibzugriffs in Bytes.
DELETE
numAddedFiles Anzahl von hinzugefügten Dateien. Wird nicht angegeben, wenn Partitionen der Tabelle gelöscht werden.
numRemovedFiles Anzahl von entfernten Dateien.
numDeletedRows Anzahl von entfernten Zeilen. Wird nicht angegeben, wenn Partitionen der Tabelle gelöscht werden.
numCopiedRows Anzahl der beim Löschen von Dateien kopierten Zeilen.
executionTimeMs Benötigte Zeit für die Ausführung des gesamten Vorgangs.
scanTimeMs Benötigte Zeit zum Überprüfen der Dateien auf Übereinstimmungen.
rewriteTimeMs Benötigte Zeit zum erneuten Schreiben der übereinstimmenden Dateien.
TRUNCATE
numRemovedFiles Anzahl von entfernten Dateien.
executionTimeMs Benötigte Zeit für die Ausführung des gesamten Vorgangs.
MERGE
numSourceRows Anzahl von Zeilen im Quelldatenrahmen (Quell-DataFrame).
numTargetRowsInserted Anzahl der in die Zieltabelle eingefügten Zeilen.
numTargetRowsUpdated Anzahl der in der Zieltabelle aktualisierten Zeilen.
numTargetRowsDeleted Anzahl der in der Zieltabelle gelöschten Zeilen.
numTargetRowsCopied Anzahl von kopierten Zielzeilen.
numOutputRows Gesamtanzahl von ausgeschriebenen Zeilen.
numTargetFilesAdded Anzahl von Dateien, die der Senke (Ziel) hinzugefügt wurden.
numTargetFilesRemoved Anzahl von Dateien, die aus der Senke (Ziel) entfernt wurden.
executionTimeMs Benötigte Zeit für die Ausführung des gesamten Vorgangs.
scanTimeMs Benötigte Zeit zum Überprüfen der Dateien auf Übereinstimmungen.
rewriteTimeMs Benötigte Zeit zum erneuten Schreiben der übereinstimmenden Dateien.
UPDATE
numAddedFiles Anzahl von hinzugefügten Dateien.
numRemovedFiles Anzahl von entfernten Dateien.
numUpdatedRows Anzahl von aktualisierten Zeilen.
numCopiedRows Anzahl von Zeilen, die beim Aktualisieren von Dateien gerade kopiert wurden.
executionTimeMs Benötigte Zeit für die Ausführung des gesamten Vorgangs.
scanTimeMs Benötigte Zeit zum Überprüfen der Dateien auf Übereinstimmungen.
rewriteTimeMs Benötigte Zeit zum erneuten Schreiben der übereinstimmenden Dateien.
FSCK numRemovedFiles Anzahl von entfernten Dateien.
CONVERT numConvertedFiles Anzahl von Parquet-Dateien, die konvertiert wurden.
OPTIMIZE
numAddedFiles Anzahl von hinzugefügten Dateien.
numRemovedFiles Anzahl von optimierten Dateien.
numAddedBytes Anzahl von Bytes, die nach Optimierung der Tabelle hinzugefügt wurden.
numRemovedBytes Anzahl von entfernten Bytes.
MinFileSize Größe der kleinsten Datei, nachdem die Tabelle optimiert wurde.
p25FileSize Größe der 25. Perzentildatei, nachdem die Tabelle optimiert wurde.
p50FileSize Mediandateigröße, nachdem die Tabelle optimiert wurde.
p75FileSize Größe der 75. Perzentildatei, nachdem die Tabelle optimiert wurde.
maxFileSize Größe der größten Datei, nachdem die Tabelle optimiert wurde.
CLONE
sourceTableSize Größe in Bytes der Quelltabelle in der geklonten Version.
sourceNumOfFiles Anzahl von Dateien in der Quelltabelle in der geklonten Version.
numRemovedFiles Anzahl von Dateien, die aus der Zieltabelle entfernt wurden, wenn eine vorherige Delta-Tabelle ersetzt wurde.
removedFilesSize Gesamtgröße in Bytes der Dateien, die aus der Zieltabelle entfernt wurden, wenn eine vorherige Delta-Tabelle ersetzt wurde.
numCopiedFiles Anzahl von Dateien, die an den neuen Speicherort kopiert wurden. „0“ für flache Klone.
copiedFilesSize Gesamtgröße in Bytes der Dateien, die an den neuen Speicherort kopiert wurden. „0“ für flache Klone.
RESTORE
tableSizeAfterRestore Tabellengröße in Bytes nach der Wiederherstellung.
numOfFilesAfterRestore Anzahl von Dateien in der Tabelle nach der Wiederherstellung.
numRemovedFiles Anzahl von Dateien, die durch den Wiederherstellungsvorgang entfernt wurden.
numRestoredFiles Anzahl von Dateien, die als Ergebnis der Wiederherstellung hinzugefügt wurden.
removedFilesSize Größe in Bytes von Dateien, die durch die Wiederherstellung entfernt wurden.
restoredFilesSize Größe in Bytes von Dateien, die durch die Wiederherstellung hinzugefügt wurden.
VACUUM
numDeletedFiles Anzahl von gelöschten Dateien.
numVacuumedDirectories Anzahl von bereinigten Verzeichnissen.
numFilesToDelete Anzahl von zu löschenden Dateien.

Was ist die Delta Lake-Zeitreise?

Delta Lake unterstützt das Abfragen früherer Tabellenversionen basierend auf dem Zeitstempel oder der Tabellenversion (gemäß Aufzeichnung im Transaktionsprotokoll). Sie können Zeitreisen für Anwendungen wie die folgenden verwenden:

  • Erneutes Erstellen von Analysen, Berichten oder Ausgaben (z. B. der Ausgabe eines Machine Learning-Modells). Dies könnte für das Debuggen oder die Überwachung nützlich sein, insbesondere in regulierten Branchen.
  • Schreiben komplexer temporaler Abfragen.
  • Beheben von Fehlern in Ihren Daten.
  • Bereitstellen der Momentaufnahmenisolation für eine Reihe von Abfragen für sich schnell verändernde Tabellen.

Wichtig

Tabellenversionen, auf die mit Zeitreisen zugegriffen werden kann, werden durch eine Kombination aus dem Aufbewahrungsschwellenwert für Transaktionsprotokolldateien und der Häufigkeit und der angegebenen Aufbewahrung für VACUUM-Vorgänge bestimmt. Wenn Sie VACUUM täglich mit den Standardwerten ausführen, stehen 7 Tage Daten für Zeitreisen zur Verfügung.

Syntax für die Delta-Zeitreise

Sie fragen eine Delta-Tabelle mit Zeitreise ab, indem Sie eine Klausel nach der Spezifikation des Tabellennamens hinzufügen.

  • timestamp_expression kann einen der folgenden Werte annehmen:
    • '2018-10-18T22:15:12.013Z', d. h. eine Zeichenfolge, die in einen Zeitstempel umgewandelt werden kann
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', d. h. eine Datumszeichenfolge
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Jeder andere Ausdruck, der ein Zeitstempel ist oder in einen Zeitstempel umgewandelt werden kann
  • version ist ein LONG-Wert, der aus der Ausgabe von DESCRIBE HISTORY table_spec abgerufen werden kann.

Weder timestamp_expression noch version können Unterabfragen sein.

Es werden nur Datums- oder Zeitstempelzeichenfolgen akzeptiert. Beispiel: "2019-01-01" und "2019-01-01T00:00:00.000Z". Eine Beispielsyntax finden Sie im folgenden Code:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

Sie können auch die Syntax @ verwenden, um den Zeitstempel oder die Version als Teil des Tabellennamens anzugeben. Der Zeitstempel muss im Format yyyyMMddHHmmssSSS vorliegen. Sie können eine Version nach @ angeben, indem Sie der Version v voranstellen. Eine Beispielsyntax finden Sie im folgenden Code:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

Was sind Transaktionsprotokoll-Prüfpunkte?

Delta Lake erfasst Tabellenversionen als JSON-Dateien im Verzeichnis _delta_log, das parallel zu Tabellendaten gespeichert wird. Zur Optimierung von Prüfpunktabfragen aggregiert Delta Lake Tabellenversionen in Parquet-Prüfpunktdateien, damit nicht alle JSON-Versionen des Tabellenverlaufs gelesen werden müssen. Azure Databricks optimiert die Prüfpunkthäufigkeit für Datengröße und Workload. Benutzer sollten nicht direkt mit Prüfpunkten interagieren müssen. Die Prüfpunkthäufigkeit kann sich ohne Ankündigung ändern.

Konfigurieren der Datenaufbewahrung für Zeitreiseabfragen

Um eine frühere Tabellenversion abzufragen, müssen Sie sowohl die Protokoll- als auch die Datendateien für diese Version beibehalten.

Datendateien werden gelöscht, wenn VACUUM gegen eine Tabelle ausgeführt wird. Delta Lake verwaltet das automatische Entfernen von Protokolldateien nach dem Erstellen eines Prüfpunkts der Tabellenversionen.

Da gegen meisten Delta-Tabellen regelmäßig VACUUM ausgeführt wird, sollten Point-in-Time-Abfragen den Aufbewahrungsschwellenwert für VACUUMberücksichtigen, der standardmäßig 7 Tage beträgt.

Um den Schwellenwert für die Datenaufbewahrung für Delta-Tabellen zu erhöhen, müssen Sie die folgenden Tabelleneigenschaften konfigurieren:

  • delta.logRetentionDuration = "interval <interval>": Steuert, wie lange der Verlauf einer Tabelle aufbewahrt wird. Der Standardwert lautet interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": bestimmt den Schwellenwert, den VACUUM verwendet, um Datendateien zu entfernen, auf die in der aktuellen Tabellenversion nicht mehr verwiesen wird. Der Standardwert lautet interval 7 days.

Sie können Delta-Eigenschaften während der Tabellenerstellung angeben oder mit einer ALTER TABLE-Anweisung festlegen. Weitere Informationen finden Sie unter Referenz zu Delta-Tabelleneigenschaften.

Hinweis

Sie müssen beide Eigenschaften festlegen, um sicherzustellen, dass der Tabellenverlauf für Tabellen mit häufigen VACUUM-Vorgängen länger beibehalten wird. Um beispielsweise auf historische Daten für 30 Tage zuzugreifen, legen Sie delta.deletedFileRetentionDuration = "interval 30 days" fest (was der Standardeinstellung für delta.logRetentionDuration entspricht).

Die Erhöhung des Schwellenwerts für die Datenaufbewahrung kann dazu führen, dass Ihre Speicherkosten steigen, da mehr Datendateien aufbewahrt werden.

Wiederherstellen eines früheren Zustands einer Delta-Tabelle

Mit dem Befehl RESTORE können Sie den früheren Zustand einer Delta-Tabelle wiederherstellen. Eine Delta-Tabelle verwaltet intern ihre Verlaufsversionen, mit denen sie auf einen früheren Zustand wiederhergestellt werden kann. Eine Version, die dem früheren Zustand entspricht, oder ein Zeitstempel von der Erstellung des früheren Zustands werden durch den Befehl RESTORE als Optionen unterstützt.

Wichtig

  • Sie können eine bereits wiederhergestellte Tabelle wiederherstellen.
  • Sie können eine geklonte Tabelle wiederherstellen.
  • Sie müssen MODIFY-Berechtigung für die zurzeit wiederhergestellte Tabelle haben.
  • Sie können eine Tabelle nicht auf eine ältere Version wiederherstellen, wenn die Datendateien manuell oder durch vacuum gelöscht wurden. Die teilweise Wiederherstellung auf diese Version ist weiterhin möglich, wenn spark.sql.files.ignoreMissingFiles auf true festgelegt wird.
  • Das Zeitstempelformat für die Wiederherstellung auf einen früheren Zustand lautet yyyy-MM-dd HH:mm:ss. Die Angabe nur einer „date(yyyy-MM-dd)“-Zeichenfolge wird ebenfalls unterstützt.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

Details zur Syntax finden Sie unter RESTORE.

Wichtig

Die Wiederherstellung wird als Datenänderungsvorgang betrachtet. Bei Delta Lake-Protokolleinträgen, die vom RESTORE-Befehl hinzugefügt wurden, ist dataChange auf TRUE festgelegt. Wenn es eine nachgelagerte Anwendung gibt, z. B. einen strukturierten Streamingauftrag, der die Aktualisierungen einer Delta Lake-Tabelle verarbeitet, werden die vom Wiederherstellungsvorgang hinzugefügten Protokolleinträge zu Datenänderungen als neue Datenaktualisierungen betrachtet, und ihre Verarbeitung kann zu Datenduplikaten führen.

Beispiele:

Tabellenversion Vorgang Delta-Protokollaktualisierungen Datensätze in Protokollaktualisierungen zu Datenänderungen
0 INSERT AddFile(/Pfad/zur/Datei-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/Pfad/zur/Datei-2, dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/Pfad/zur/Datei-3, dataChange = false), RemoveFile(/Pfad/zur/Datei-1), RemoveFile(/Pfad/zur/Datei-2) (Wenn bei der Optimierungskomprimierung keine Datensätze verarbeitet werden, erfolgen keine Änderungen an den Daten in der Tabelle.)
3 RESTORE(version=1) RemoveFile(/Pfad/zur/Datei-3), AddFile(/Pfad/zur/Datei-1, dataChange = true), AddFile(/Pfad/zur/Datei-2, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

Im obigen Beispiel führt der RESTORE-Befehl zu Aktualisierungen, die beim Lesen von Version 0 und 1 der Delta-Tabelle bereits sichtbar waren. Wenn diese Tabelle bei einer Streamingabfrage gelesen wird, werden diese Dateien als neu hinzugefügte Daten betrachtet und erneut verarbeitet.

Wiederherstellen von Metriken

RESTORE meldet die folgenden Metriken als einzeiligen Datenrahmen (DataFrame), sobald der Vorgang abgeschlossen ist:

  • table_size_after_restore: Die Größe der Tabelle nach der Wiederherstellung.

  • num_of_files_after_restore Die Anzahl von Dateien in der Tabelle nach der Wiederherstellung.

  • num_removed_files: Die Anzahl von Dateien, die aus der Tabelle entfernt (logisch gelöscht) wurden.

  • num_restored_files: Die Anzahl von Dateien, die aufgrund eines Rollbacks wiederhergestellt wurden.

  • removed_files_size: Die Gesamtgröße in Bytes der aus der Tabelle entfernten Dateien.

  • restored_files_size: Die Gesamtgröße in Bytes der wiederhergestellten Dateien.

    Beispiel der Wiederherstellung von Metriken

Beispiele für die Verwendung von Delta Lake-Zeitreisen

  • Korrigieren versehentlicher Löschungen in einer Tabelle für den Benutzer 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Korrigieren versehentlicher falscher Updates für eine Tabelle:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Abfragen der Anzahl von Neukunden, die in der letzten Woche gewonnen wurden.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Wie finde ich die Version des letzten Commits in der Spark-Sitzung?

Um die Versionsnummer des letzten Commits zu erhalten, der von der aktuellen SparkSession für alle Threads und alle Tabellen geschrieben wurde, fragen Sie die SQL-Konfiguration spark.databricks.delta.lastCommitVersionInSession ab.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Wenn von der SparkSession keine Commits durchgeführt wurden, wird beim Abfragen des Schlüssels ein leerer Wert zurückgegeben.

Hinweis

Wenn Sie dieselbe SparkSession über mehrere Threads hinweg freigeben, ähnelt dies der Freigabe einer Variablen, ebenfalls über mehrere Threads hinweg. Möglicherweise kommt es zu Racebedingungen, da der Konfigurationswert gleichzeitig aktualisiert wird.