Utiliser l’historique des tables Delta Lake
Chaque opération qui modifie une table Delta Lake crée une nouvelle version de table. Vous pouvez utiliser des informations d’historique pour auditer des opérations, restaurer une table ou interroger une table à un moment spécifique en utilisant le voyage dans le temps.
Notes
Databricks ne vous recommande pas d’utiliser l’historique des tables Delta Lake comme solution de sauvegarde à long terme pour l’archivage des données. Databricks recommande d’utiliser uniquement les 7 derniers jours pour des opérations de voyage dans le temps, sauf si vous avez défini des configurations de conservation des données et des journaux sur une valeur supérieure.
Récupérer l’historique d’une table Delta
Vous pouvez récupérer des informations, notamment les opérations, l’utilisateur et le timestamp pour chaque écriture dans une table Delta en exécutant la commande history
. Les opérations sont retournées dans l’ordre chronologique inverse.
La rétention de l’historique des tables est déterminée par le paramètre de table delta.logRetentionDuration
, qui est de 30 jours par défaut.
Notes
Le voyage dans le temps et l’historique des tables sont contrôlés par des seuils de rétention distincts. Consultez Qu’est-ce que le voyage dans le temps Delta Lake ?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Pour plus d’informations sur la syntaxe de Spark SQL, consultez DESCRIBE HISTORY.
Pour plus d’informations sur les syntaxes Scala, Java et Python, consultez Documentation sur l’API Delta Lake.
Catalog Explorer fournit un aperçu visuel de ces informations et de l'historique détaillés des tables Delta. En plus du schéma de la table et des exemples de données, vous pouvez cliquer sur l’onglet historique pour afficher l’historique de la table qui s’affiche avec DESCRIBE HISTORY
.
Schéma de l’historique
La sortie de l'opération history
contient les colonnes suivantes.
Colonne | Type | Description |
---|---|---|
version | long | Version de la table générée par l’opération. |
timestamp | timestamp | Moment de validation de cette version. |
userId | string | ID de l’utilisateur qui a effectué l’opération. |
userName | string | Nom de l’utilisateur qui a effectué l’opération. |
opération | string | Nom de l’opération. |
operationParameters | map | Paramètres de l’opération (par exemple, prédicats). |
travail | struct | Détails du travail qui a effectué l’opération. |
notebook | struct | Détails du notebook à partir duquel l’opération a été effectuée. |
clusterId | string | ID du cluster sur lequel l’opération a été effectuée. |
readVersion | long | Version de la table qui a été lue pour effectuer l’opération d’écriture. |
isolationLevel | string | Niveau d’isolation utilisé pour cette opération. |
isBlindAppend | boolean | Indique si cette opération a ajouté des données. |
operationMetrics | map | Métriques de l’opération (par exemple, nombre de lignes et fichiers modifiés). |
UserMetadata | string | Métadonnées de validation définies par l’utilisateur si elles ont été spécifiées. |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Remarque
- Quelques-unes des autres colonnes ne sont pas disponibles si vous écrivez dans une table Delta à l’aide des méthodes suivantes :
- Les colonnes ajoutées à l’avenir seront toujours ajoutées après la dernière colonne.
Clés des métriques d’opération
L’opération history
retourne une collection de métriques d’opération dans le mappage de colonnes operationMetrics
.
Les tableaux suivants répertorient les définitions de clé de mappage par opération.
Opération | Nom de métrique | Description |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO | ||
numFiles | Nombre de fichiers écrits. | |
numOutputBytes | Taille en octets du contenu écrit. | |
numOutputRows | Nombre de lignes écrites. | |
STREAMING UPDATE | ||
numAddedFiles | Nombre de fichiers ajoutés | |
numRemovedFiles | Nombre de fichiers supprimés. | |
numOutputRows | Nombre de lignes écrites. | |
numOutputBytes | Taille de l’écriture en octets. | |
Suppression | ||
numAddedFiles | Nombre de fichiers ajoutés Non fourni lors de la suppression des partitions de la table. | |
numRemovedFiles | Nombre de fichiers supprimés. | |
numDeletedRows | Nombre de lignes supprimées. Non fourni lors de la suppression des partitions de la table. | |
numCopiedRows | Nombre de lignes copiées dans le processus de suppression des fichiers. | |
executionTimeMs | Temps pris pour exécuter la totalité de l’opération. | |
scanTimeMs | Temps pris pour analyser les fichiers à la recherche de correspondances. | |
rewriteTimeMs | Temps pris pour réécrire les fichiers correspondants. | |
TRUNCATE | ||
numRemovedFiles | Nombre de fichiers supprimés. | |
executionTimeMs | Temps pris pour exécuter la totalité de l’opération. | |
MERGE | ||
numSourceRows | Nombre de lignes dans la tramedonnées source. | |
numTargetRowsInserted | Nombre de lignes insérées dans la table cible. | |
numTargetRowsUpdated | Nombre de lignes mises à jour dans la table cible. | |
numTargetRowsDeleted | Nombre de lignes supprimées dans la table cible. | |
numTargetRowsCopied | Nombre de lignes cibles copiées. | |
numOutputRows | Nombre total de lignes écrites. | |
numTargetFilesAdded | Nombre de fichiers ajoutés au récepteur (cible). | |
numTargetFilesRemoved | Nombre de fichiers supprimés du récepteur (cible). | |
executionTimeMs | Temps pris pour exécuter la totalité de l’opération. | |
scanTimeMs | Temps pris pour analyser les fichiers à la recherche de correspondances. | |
rewriteTimeMs | Temps pris pour réécrire les fichiers correspondants. | |
UPDATE | ||
numAddedFiles | Nombre de fichiers ajoutés | |
numRemovedFiles | Nombre de fichiers supprimés. | |
numUpdatedRows | Nombre de lignes mises à jour. | |
numCopiedRows | Nombre de lignes copiées dans le cadre du processus de mise à jour des fichiers. | |
executionTimeMs | Temps pris pour exécuter la totalité de l’opération. | |
scanTimeMs | Temps pris pour analyser les fichiers à la recherche de correspondances. | |
rewriteTimeMs | Temps pris pour réécrire les fichiers correspondants. | |
FSCK | numRemovedFiles | Nombre de fichiers supprimés. |
CONVERT | numConvertedFiles | Nombre de fichiers Parquet convertis. |
OPTIMIZE | ||
numAddedFiles | Nombre de fichiers ajoutés | |
numRemovedFiles | Nombre de fichiers optimisés. | |
numAddedBytes | Nombre d’octets ajoutés après optimisation de la table. | |
numRemovedBytes | Nombre d'octets supprimés. | |
minFileSize | Taille du plus petit fichier après optimisation de la table. | |
p25FileSize | Taille du fichier au 25e centile après optimisation de la table. | |
p50FileSize | Taille du fichier médiane après optimisation de la table. | |
p75FileSize | Taille du fichier au 75e centile après optimisation de la table. | |
maxFileSize | Taille du fichier le plus grand après optimisation de la table. | |
CLONE | ||
sourceTableSize | Taille en octets de la table source au niveau de la version clonée. | |
sourceNumOfFiles | Nombre de fichiers dans la table source au niveau de la version clonée. | |
numRemovedFiles | Nombre de fichiers supprimés de la table cible si une table Delta précédente a été remplacée. | |
removedFilesSize | Taille totale en octets des fichiers supprimés de la table cible si une table Delta précédente a été remplacée. | |
numCopiedFiles | Nombre de fichiers copiés vers le nouvel emplacement. 0 pour les clones superficiels. | |
copiedFilesSize | Taille totale en octets des fichiers qui ont été copiés vers le nouvel emplacement. 0 pour les clones superficiels. | |
RESTORE | ||
tableSizeAfterRestore | Taille de la table en octets après restauration. | |
numOfFilesAfterRestore | Nombre de fichiers dans la table après restauration. | |
numRemovedFiles | Nombre de fichiers supprimés par l’opération de restauration. | |
numRestoredFiles | Nombre de fichiers ajoutés suite à la restauration. | |
removedFilesSize | Taille en octets des fichiers supprimés par la restauration. | |
restoredFilesSize | Taille en octets des fichiers supprimés par la restauration. | |
VACUUM | ||
numDeletedFiles | Nombre de fichiers supprimés | |
numVacuumedDirectories | Nombre de répertoires vidés. | |
numFilesToDelete | Nombre de fichiers à supprimer. |
Qu’est-ce que le voyage dans le temps Delta Lake ?
Le voyage dans le temps Delta Lake prend en charge l’interrogation des versions de table précédentes en fonction du timestamp ou de la version de table (comme enregistré dans le journal des transactions). Vous pouvez utiliser le voyage dans le temps pour des applications telles que celles qui suivent :
- Recréer des analyses, des rapports ou des sorties (par exemple, la sortie d’un modèle Machine Learning). Cela peut servir au débogage ou à l’audit, en particulier dans les secteurs réglementés.
- Écriture de requêtes temporelles complexes.
- Correction des erreurs dans vos données.
- Fournir un instantané d’isolation pour un ensemble de requêtes des tables à variation rapide.
Important
Les versions de table accessibles avec un voyage dans le temps sont déterminées par une combinaison du seuil de rétention pour des fichiers journaux des transactions et de la fréquence et de la rétention spécifiée pour les opérations VACUUM
. Si vous exécutez VACUUM
quotidiennement avec les valeurs par défaut, 7 jours de données sont disponibles pour le voyage dans le temps.
Syntaxe du voyage dans le temps Delta
Vous interrogez une table Delta avec un voyage dans le temps en ajoutant une clause après la spécification du nom de table.
timestamp_expression
peut être n’importe quel :'2018-10-18T22:15:12.013Z'
, autrement dit, une chaîne qui peut être convertie en horodateurcast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, autrement dit, une chaîne de datecurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Toute autre expression qui est ou qui peut être castée en un timestamp
version
est une valeur de type long qui peut être obtenue à partir de la sortie deDESCRIBE HISTORY table_spec
.
Ni timestamp_expression
ni version
ne peuvent être des sous-requêtes.
Seules les chaînes de date ou timestamp sont acceptées. Par exemple, "2019-01-01"
et "2019-01-01T00:00:00.000Z"
. Pour obtenir un exemple de syntaxe, consultez le code suivant :
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
Vous pouvez également utiliser la syntaxe @
pour spécifier le timestamp ou la version dans le cadre du nom de la table. Le timestamp doit être au format yyyyMMddHHmmssSSS
. Vous pouvez spécifier une version après @
en ajoutant v
à la version. Pour obtenir un exemple de syntaxe, consultez le code suivant :
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
En quoi consistent les points de contrôle du journal des transactions ?
Delta Lake enregistre les versions de table sous forme de fichiers JSON dans le répertoire _delta_log
, qui est stocké en même temps que les données de table. Pour optimiser l’interrogation des points de contrôle, Delta Lake agrège les versions de table en fichiers de point de contrôle Parquet, ce qui évite d’avoir à lire toutes les versions JSON de l’historique des tables. Azure Databricks optimise la fréquence des points de contrôle pour la taille des données et la charge de travail. Les utilisateurs ne doivent pas avoir besoin d’interagir directement avec les points de contrôle. La fréquence du point de contrôle est susceptible d’être modifiée sans préavis.
Configurer la conservation des données pour des requêtes de voyage dans le temps
Pour interroger une version de table antérieure, vous devez conserver à la fois le journal et les fichiers de données pour cette version.
Les fichiers de données sont supprimés lorsque VACUUM
s’exécute sur une table. Delta Lake gère automatiquement la suppression des fichiers journaux après le contrôle des versions de table.
Étant donné que la plupart des tables Delta demandent que VACUUM
s’exécute régulièrement sur elles, les requêtes à un point dans le temps doivent respecter le seuil de rétention pour VACUUM
, qui est de 7 jours par défaut.
Pour augmenter le seuil de conservation des données pour des tables Delta, vous devez configurer les propriétés de table suivantes :
delta.logRetentionDuration = "interval <interval>"
: contrôle la durée de conservation de l’historique d’une table. Par défaut, il s’agit deinterval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: détermine le seuil queVACUUM
utilise pour supprimer des fichiers de données qui ne sont plus référencés dans la version actuelle de la table. Par défaut, il s’agit deinterval 7 days
.
Vous pouvez spécifier des propriétés Delta lors de la création d’une table ou les définir avec une instruction ALTER TABLE
. Consultez Référence sur les propriétés de table Delta.
Notes
Vous devez définir ces deux propriétés pour veiller à ce que l’historique des tables soit conservé pendant une durée plus longue pour les tables avec des opérations VACUUM
fréquentes. Par exemple, pour accéder à 30 jours de données historiques, définissez delta.deletedFileRetentionDuration = "interval 30 days"
(qui correspond au paramètre par défaut pour delta.logRetentionDuration
).
L’augmentation du seuil de conservation des données peut entraîner une augmentation de vos coûts de stockage, à mesure que d’autres fichiers de données sont gérés.
Restaurer une table Delta à un état antérieur
Vous pouvez restaurer une table Delta à son état antérieur à l’aide de la commande RESTORE
. Une table Delta conserve en interne ses versions historiques qui permettent sa restauration à un état antérieur.
La commande RESTORE
prend en charge en tant qu’options une version correspondant à l’état antérieur ou un horodateur du moment de la création de l’état antérieur.
Important
- Vous pouvez restaurer une table déjà restaurée.
- Vous pouvez restaurer une table clonée.
- Vous devez disposer d’une autorisation
MODIFY
sur la table en cours de restauration. - Vous ne pouvez pas restaurer une table vers une version antérieure où les fichiers de données ont été supprimés manuellement ou par
vacuum
. Une restauration partielle vers cette version reste possible sispark.sql.files.ignoreMissingFiles
a la valeurtrue
. - Le format de timestamp pour la restauration vers un état antérieur est
yyyy-MM-dd HH:mm:ss
. La fourniture d’une seule chaîne de type date (yyyy-MM-dd
) est également prise en charge.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Pour plus d’informations sur la syntaxe, consultez RESTORE.
Important
La restauration est considérée comme une opération de modification des données. Les entrées de journal Delta Lake ajoutées par la commande RESTORE
contiennent dataChange défini sur true. S’il existe une application en aval, comme un travail de Diffusion en continu structurée qui traite les mises à jour d’une table Delta Lake, les entrées du journal des modifications de données ajoutées par l’opération de restauration sont considérées comme de nouvelles mises à jour de données, et leur traitement peut entraîner des données en double.
Par exemple :
Version de la table | Opération | Mises à jour du journal Delta | Enregistrements dans les mises à jour du journal des modifications de données |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (No records as Optimize compaction does not change the data in the table) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
Dans l’exemple précédent, la commande RESTORE
génère des mises à jour qui ont déjà été vues lors de la lecture de la table Delta version 0 et 1. Si une requête de diffusion en continu a lu cette table, ces fichiers seront considérés comme des données nouvellement ajoutées et seront traités à nouveau.
Restaurer des métriques
RESTORE
présente les métriques suivantes sous la forme d’une ligne unique de tramedonnées une fois l’opération terminée :
table_size_after_restore
: taille de la table après restauration.num_of_files_after_restore
: nombre de fichiers dans la table après restauration.num_removed_files
: nombre de fichiers supprimés (logiquement) de la table.num_restored_files
: nombre de fichiers restaurés en raison d’une restauration.removed_files_size
: taille totale en octets des fichiers supprimés de la table.restored_files_size
: taille totale en octets des fichiers restaurés.
Exemples d’utilisation du voyage dans le temps Delta Lake
Correction des suppressions accidentelles d’une table pour l’utilisateur
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Correction des mises à jour accidentelles incorrectes d’une table :
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Interroger le nombre de nouveaux clients ajoutés au cours de la semaine dernière.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Comment rechercher la version de dernier commit dans la session Spark ?
Pour obtenir le numéro de version de la dernière validation écrite par la SparkSession
en cours sur l’ensemble des threads et des tables, interrogez la configuration SQL spark.databricks.delta.lastCommitVersionInSession
.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Si aucune validation n’a été effectuée par la session SparkSession
, l’interrogation de la clé retourne une valeur vide.
Notes
Si vous partagez la même SparkSession
entre plusieurs threads, cela est similaire au partage d’une variable. Vous risquez de rencontrer des situations de conflit d’accès car la valeur de configuration est mise à jour simultanément.