Utiliser l’historique des tables Delta Lake

Chaque opération qui modifie une table Delta Lake crée une nouvelle version de table. Vous pouvez utiliser des informations d’historique pour auditer des opérations, restaurer une table ou interroger une table à un moment spécifique en utilisant le voyage dans le temps.

Notes

Databricks ne vous recommande pas d’utiliser l’historique des tables Delta Lake comme solution de sauvegarde à long terme pour l’archivage des données. Databricks recommande d’utiliser uniquement les 7 derniers jours pour des opérations de voyage dans le temps, sauf si vous avez défini des configurations de conservation des données et des journaux sur une valeur supérieure.

Récupérer l’historique d’une table Delta

Vous pouvez récupérer des informations, notamment les opérations, l’utilisateur et le timestamp pour chaque écriture dans une table Delta en exécutant la commande history. Les opérations sont retournées dans l’ordre chronologique inverse.

La rétention de l’historique des tables est déterminée par le paramètre de table delta.logRetentionDuration, qui est de 30 jours par défaut.

Notes

Le voyage dans le temps et l’historique des tables sont contrôlés par des seuils de rétention distincts. Consultez Qu’est-ce que le voyage dans le temps Delta Lake ?.

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

Pour plus d’informations sur la syntaxe de Spark SQL, consultez DESCRIBE HISTORY.

Pour plus d’informations sur les syntaxes Scala, Java et Python, consultez Documentation sur l’API Delta Lake.

Catalog Explorer fournit un aperçu visuel de ces informations et de l'historique détaillés des tables Delta. En plus du schéma de la table et des exemples de données, vous pouvez cliquer sur l’onglet historique pour afficher l’historique de la table qui s’affiche avec DESCRIBE HISTORY.

Schéma de l’historique

La sortie de l'opération history contient les colonnes suivantes.

Colonne Type Description
version long Version de la table générée par l’opération.
timestamp timestamp Moment de validation de cette version.
userId string ID de l’utilisateur qui a effectué l’opération.
userName string Nom de l’utilisateur qui a effectué l’opération.
opération string Nom de l’opération.
operationParameters map Paramètres de l’opération (par exemple, prédicats).
travail struct Détails du travail qui a effectué l’opération.
notebook struct Détails du notebook à partir duquel l’opération a été effectuée.
clusterId string ID du cluster sur lequel l’opération a été effectuée.
readVersion long Version de la table qui a été lue pour effectuer l’opération d’écriture.
isolationLevel string Niveau d’isolation utilisé pour cette opération.
isBlindAppend boolean Indique si cette opération a ajouté des données.
operationMetrics map Métriques de l’opération (par exemple, nombre de lignes et fichiers modifiés).
UserMetadata string Métadonnées de validation définies par l’utilisateur si elles ont été spécifiées.
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Remarque

Clés des métriques d’opération

L’opération history retourne une collection de métriques d’opération dans le mappage de colonnes operationMetrics.

Les tableaux suivants répertorient les définitions de clé de mappage par opération.

Opération Nom de métrique Description
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
numFiles Nombre de fichiers écrits.
numOutputBytes Taille en octets du contenu écrit.
numOutputRows Nombre de lignes écrites.
STREAMING UPDATE
numAddedFiles Nombre de fichiers ajoutés
numRemovedFiles Nombre de fichiers supprimés.
numOutputRows Nombre de lignes écrites.
numOutputBytes Taille de l’écriture en octets.
Suppression
numAddedFiles Nombre de fichiers ajoutés Non fourni lors de la suppression des partitions de la table.
numRemovedFiles Nombre de fichiers supprimés.
numDeletedRows Nombre de lignes supprimées. Non fourni lors de la suppression des partitions de la table.
numCopiedRows Nombre de lignes copiées dans le processus de suppression des fichiers.
executionTimeMs Temps pris pour exécuter la totalité de l’opération.
scanTimeMs Temps pris pour analyser les fichiers à la recherche de correspondances.
rewriteTimeMs Temps pris pour réécrire les fichiers correspondants.
TRUNCATE
numRemovedFiles Nombre de fichiers supprimés.
executionTimeMs Temps pris pour exécuter la totalité de l’opération.
MERGE
numSourceRows Nombre de lignes dans la tramedonnées source.
numTargetRowsInserted Nombre de lignes insérées dans la table cible.
numTargetRowsUpdated Nombre de lignes mises à jour dans la table cible.
numTargetRowsDeleted Nombre de lignes supprimées dans la table cible.
numTargetRowsCopied Nombre de lignes cibles copiées.
numOutputRows Nombre total de lignes écrites.
numTargetFilesAdded Nombre de fichiers ajoutés au récepteur (cible).
numTargetFilesRemoved Nombre de fichiers supprimés du récepteur (cible).
executionTimeMs Temps pris pour exécuter la totalité de l’opération.
scanTimeMs Temps pris pour analyser les fichiers à la recherche de correspondances.
rewriteTimeMs Temps pris pour réécrire les fichiers correspondants.
UPDATE
numAddedFiles Nombre de fichiers ajoutés
numRemovedFiles Nombre de fichiers supprimés.
numUpdatedRows Nombre de lignes mises à jour.
numCopiedRows Nombre de lignes copiées dans le cadre du processus de mise à jour des fichiers.
executionTimeMs Temps pris pour exécuter la totalité de l’opération.
scanTimeMs Temps pris pour analyser les fichiers à la recherche de correspondances.
rewriteTimeMs Temps pris pour réécrire les fichiers correspondants.
FSCK numRemovedFiles Nombre de fichiers supprimés.
CONVERT numConvertedFiles Nombre de fichiers Parquet convertis.
OPTIMIZE
numAddedFiles Nombre de fichiers ajoutés
numRemovedFiles Nombre de fichiers optimisés.
numAddedBytes Nombre d’octets ajoutés après optimisation de la table.
numRemovedBytes Nombre d'octets supprimés.
minFileSize Taille du plus petit fichier après optimisation de la table.
p25FileSize Taille du fichier au 25e centile après optimisation de la table.
p50FileSize Taille du fichier médiane après optimisation de la table.
p75FileSize Taille du fichier au 75e centile après optimisation de la table.
maxFileSize Taille du fichier le plus grand après optimisation de la table.
CLONE
sourceTableSize Taille en octets de la table source au niveau de la version clonée.
sourceNumOfFiles Nombre de fichiers dans la table source au niveau de la version clonée.
numRemovedFiles Nombre de fichiers supprimés de la table cible si une table Delta précédente a été remplacée.
removedFilesSize Taille totale en octets des fichiers supprimés de la table cible si une table Delta précédente a été remplacée.
numCopiedFiles Nombre de fichiers copiés vers le nouvel emplacement. 0 pour les clones superficiels.
copiedFilesSize Taille totale en octets des fichiers qui ont été copiés vers le nouvel emplacement. 0 pour les clones superficiels.
RESTORE
tableSizeAfterRestore Taille de la table en octets après restauration.
numOfFilesAfterRestore Nombre de fichiers dans la table après restauration.
numRemovedFiles Nombre de fichiers supprimés par l’opération de restauration.
numRestoredFiles Nombre de fichiers ajoutés suite à la restauration.
removedFilesSize Taille en octets des fichiers supprimés par la restauration.
restoredFilesSize Taille en octets des fichiers supprimés par la restauration.
VACUUM
numDeletedFiles Nombre de fichiers supprimés
numVacuumedDirectories Nombre de répertoires vidés.
numFilesToDelete Nombre de fichiers à supprimer.

Qu’est-ce que le voyage dans le temps Delta Lake ?

Le voyage dans le temps Delta Lake prend en charge l’interrogation des versions de table précédentes en fonction du timestamp ou de la version de table (comme enregistré dans le journal des transactions). Vous pouvez utiliser le voyage dans le temps pour des applications telles que celles qui suivent :

  • Recréer des analyses, des rapports ou des sorties (par exemple, la sortie d’un modèle Machine Learning). Cela peut servir au débogage ou à l’audit, en particulier dans les secteurs réglementés.
  • Écriture de requêtes temporelles complexes.
  • Correction des erreurs dans vos données.
  • Fournir un instantané d’isolation pour un ensemble de requêtes des tables à variation rapide.

Important

Les versions de table accessibles avec un voyage dans le temps sont déterminées par une combinaison du seuil de rétention pour des fichiers journaux des transactions et de la fréquence et de la rétention spécifiée pour les opérations VACUUM. Si vous exécutez VACUUM quotidiennement avec les valeurs par défaut, 7 jours de données sont disponibles pour le voyage dans le temps.

Syntaxe du voyage dans le temps Delta

Vous interrogez une table Delta avec un voyage dans le temps en ajoutant une clause après la spécification du nom de table.

  • timestamp_expression peut être n’importe quel :
    • '2018-10-18T22:15:12.013Z', autrement dit, une chaîne qui peut être convertie en horodateur
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', autrement dit, une chaîne de date
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Toute autre expression qui est ou qui peut être castée en un timestamp
  • version est une valeur de type long qui peut être obtenue à partir de la sortie de DESCRIBE HISTORY table_spec.

Ni timestamp_expression ni version ne peuvent être des sous-requêtes.

Seules les chaînes de date ou timestamp sont acceptées. Par exemple, "2019-01-01" et "2019-01-01T00:00:00.000Z". Pour obtenir un exemple de syntaxe, consultez le code suivant :

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

Vous pouvez également utiliser la syntaxe @ pour spécifier le timestamp ou la version dans le cadre du nom de la table. Le timestamp doit être au format yyyyMMddHHmmssSSS. Vous pouvez spécifier une version après @ en ajoutant v à la version. Pour obtenir un exemple de syntaxe, consultez le code suivant :

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

En quoi consistent les points de contrôle du journal des transactions ?

Delta Lake enregistre les versions de table sous forme de fichiers JSON dans le répertoire _delta_log, qui est stocké en même temps que les données de table. Pour optimiser l’interrogation des points de contrôle, Delta Lake agrège les versions de table en fichiers de point de contrôle Parquet, ce qui évite d’avoir à lire toutes les versions JSON de l’historique des tables. Azure Databricks optimise la fréquence des points de contrôle pour la taille des données et la charge de travail. Les utilisateurs ne doivent pas avoir besoin d’interagir directement avec les points de contrôle. La fréquence du point de contrôle est susceptible d’être modifiée sans préavis.

Configurer la conservation des données pour des requêtes de voyage dans le temps

Pour interroger une version de table antérieure, vous devez conserver à la fois le journal et les fichiers de données pour cette version.

Les fichiers de données sont supprimés lorsque VACUUM s’exécute sur une table. Delta Lake gère automatiquement la suppression des fichiers journaux après le contrôle des versions de table.

Étant donné que la plupart des tables Delta demandent que VACUUM s’exécute régulièrement sur elles, les requêtes à un point dans le temps doivent respecter le seuil de rétention pour VACUUM, qui est de 7 jours par défaut.

Pour augmenter le seuil de conservation des données pour des tables Delta, vous devez configurer les propriétés de table suivantes :

  • delta.logRetentionDuration = "interval <interval>": contrôle la durée de conservation de l’historique d’une table. Par défaut, il s’agit de interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>" : détermine le seuil que VACUUM utilise pour supprimer des fichiers de données qui ne sont plus référencés dans la version actuelle de la table. Par défaut, il s’agit de interval 7 days.

Vous pouvez spécifier des propriétés Delta lors de la création d’une table ou les définir avec une instruction ALTER TABLE. Consultez Référence sur les propriétés de table Delta.

Notes

Vous devez définir ces deux propriétés pour veiller à ce que l’historique des tables soit conservé pendant une durée plus longue pour les tables avec des opérations VACUUM fréquentes. Par exemple, pour accéder à 30 jours de données historiques, définissez delta.deletedFileRetentionDuration = "interval 30 days" (qui correspond au paramètre par défaut pour delta.logRetentionDuration).

L’augmentation du seuil de conservation des données peut entraîner une augmentation de vos coûts de stockage, à mesure que d’autres fichiers de données sont gérés.

Restaurer une table Delta à un état antérieur

Vous pouvez restaurer une table Delta à son état antérieur à l’aide de la commande RESTORE. Une table Delta conserve en interne ses versions historiques qui permettent sa restauration à un état antérieur. La commande RESTORE prend en charge en tant qu’options une version correspondant à l’état antérieur ou un horodateur du moment de la création de l’état antérieur.

Important

  • Vous pouvez restaurer une table déjà restaurée.
  • Vous pouvez restaurer une table clonée.
  • Vous devez disposer d’une autorisation MODIFY sur la table en cours de restauration.
  • Vous ne pouvez pas restaurer une table vers une version antérieure où les fichiers de données ont été supprimés manuellement ou par vacuum. Une restauration partielle vers cette version reste possible si spark.sql.files.ignoreMissingFiles a la valeur true.
  • Le format de timestamp pour la restauration vers un état antérieur est yyyy-MM-dd HH:mm:ss. La fourniture d’une seule chaîne de type date (yyyy-MM-dd) est également prise en charge.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

Pour plus d’informations sur la syntaxe, consultez RESTORE.

Important

La restauration est considérée comme une opération de modification des données. Les entrées de journal Delta Lake ajoutées par la commande RESTORE contiennent dataChange défini sur true. S’il existe une application en aval, comme un travail de Diffusion en continu structurée qui traite les mises à jour d’une table Delta Lake, les entrées du journal des modifications de données ajoutées par l’opération de restauration sont considérées comme de nouvelles mises à jour de données, et leur traitement peut entraîner des données en double.

Par exemple :

Version de la table Opération Mises à jour du journal Delta Enregistrements dans les mises à jour du journal des modifications de données
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (No records as Optimize compaction does not change the data in the table)
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

Dans l’exemple précédent, la commande RESTORE génère des mises à jour qui ont déjà été vues lors de la lecture de la table Delta version 0 et 1. Si une requête de diffusion en continu a lu cette table, ces fichiers seront considérés comme des données nouvellement ajoutées et seront traités à nouveau.

Restaurer des métriques

RESTORE présente les métriques suivantes sous la forme d’une ligne unique de tramedonnées une fois l’opération terminée :

  • table_size_after_restore : taille de la table après restauration.

  • num_of_files_after_restore : nombre de fichiers dans la table après restauration.

  • num_removed_files : nombre de fichiers supprimés (logiquement) de la table.

  • num_restored_files : nombre de fichiers restaurés en raison d’une restauration.

  • removed_files_size : taille totale en octets des fichiers supprimés de la table.

  • restored_files_size : taille totale en octets des fichiers restaurés.

    Restaurer des exemples de métriques

Exemples d’utilisation du voyage dans le temps Delta Lake

  • Correction des suppressions accidentelles d’une table pour l’utilisateur 111 :

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Correction des mises à jour accidentelles incorrectes d’une table :

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Interroger le nombre de nouveaux clients ajoutés au cours de la semaine dernière.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Comment rechercher la version de dernier commit dans la session Spark ?

Pour obtenir le numéro de version de la dernière validation écrite par la SparkSession en cours sur l’ensemble des threads et des tables, interrogez la configuration SQL spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Si aucune validation n’a été effectuée par la session SparkSession, l’interrogation de la clé retourne une valeur vide.

Notes

Si vous partagez la même SparkSession entre plusieurs threads, cela est similaire au partage d’une variable. Vous risquez de rencontrer des situations de conflit d’accès car la valeur de configuration est mise à jour simultanément.