Práce s historií tabulek Delta Lake

Každá operace, která upravuje tabulku Delta Lake, vytvoří novou verzi tabulky. Informace o historii můžete použít k auditování operací, vrácení tabulky zpět nebo dotazování na tabulku v určitém časovém okamžiku pomocí časového cestování.

Poznámka:

Databricks nedoporučuje používat historii tabulky Delta Lake jako řešení dlouhodobého zálohování pro archivaci dat. Pokud jste nenastavili větší hodnotu pro uchovávání dat i protokolů, doporučuje Databricks používat pro operace pohyb v čase pouze posledních 7 dnů.

Načtení historie tabulky Delta

Spuštěním příkazu můžete načíst informace, včetně operací, uživatelů a časového razítka každého zápisu history do tabulky Delta. Operace se vrátí v obráceném chronologickém pořadí.

Uchovávání historie tabulek je určeno nastavením delta.logRetentionDurationtabulky, což je ve výchozím nastavení 30 dnů.

Poznámka:

Pohyb v čase a historie tabulky jsou řízeny různými prahovými hodnotami uchovávání dat. Přečtěte si: Co je pohyb v čase ve službě Delta Lake?.

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

Podrobnosti o syntaxi Spark SQL najdete v tématu POPIS HISTORIE.

Podrobnosti o syntaxi jazyka Scala/Java/Python najdete v dokumentaci k rozhraní Delta Lake API.

Průzkumník katalogu poskytuje vizuální zobrazení těchto podrobných informací o tabulce a historii tabulek Delta. Kromě schématu tabulky a ukázkových dat můžete kliknutím na kartu Historie zobrazit historii tabulek, která se zobrazí pomocí DESCRIBE HISTORY.

Schéma historie

Výstup operace history má následující sloupce.

Column Type Popis
version long Verze tabulky vygenerovaná operací
časové razítko časové razítko Při potvrzení této verze.
userId string ID uživatele, který operaci spustil.
userName string Jméno uživatele, který operaci spustil.
operation string Název operace.
operationParameters map Parametry operace (například predikáty).)
úloha struct Podrobnosti o úloze, která operaci spustila.
poznámkový blok struct Podrobnosti o poznámkovém bloku, ze kterého byla operace spuštěna.
clusterId string ID clusteru, na kterém byla operace spuštěna.
readVersion long Verze tabulky, která byla přečtená k provedení operace zápisu.
isolationLevel string Úroveň izolace použitá pro tuto operaci.
isBlindAppend boolean Určuje, jestli tato operace připojila data.
operationMetrics map Metriky operace (například počet řádků a souborů upravených).)
userMetadata string Uživatelsky definovaná metadata potvrzení, pokud byla zadána
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Poznámka:

Klíče metrik operací

Operace history vrátí kolekci metrik operací v mapě operationMetrics sloupců.

Následující tabulky uvádějí definice klíčů mapování podle operace.

Operace Název metriky Popis
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
numFiles Počet zapsaných souborů
numOutputBytes Velikost v bajtech psaného obsahu
numOutputRows Počet zapsaných řádků
AKTUALIZACE STREAMOVÁNÍ
numAddedFiles Počet přidaných souborů
numRemovedFiles Počet odebraných souborů
numOutputRows Počet zapsaných řádků
numOutputBytes Velikost zápisu v bajtech
DELETE
numAddedFiles Počet přidaných souborů Nezadá se při odstranění oddílů tabulky.
numRemovedFiles Počet odebraných souborů
numDeletedRows Počet odebraných řádků Nezadá se při odstranění oddílů tabulky.
numCopiedRows Počet řádků zkopírovaných v procesu odstraňování souborů
executionTimeMs Doba potřebná ke spuštění celé operace.
scanTimeMs Doba potřebná ke kontrole shodných souborů
přepsáníTimeMs Doba potřebná k přepsání odpovídajících souborů
ZKRÁTIT
numRemovedFiles Počet odebraných souborů
executionTimeMs Doba potřebná ke spuštění celé operace.
SLOUČIT
numSourceRows Počet řádků ve zdrojovém datovém rámci
numTargetRowsInserted Počet řádků vložených do cílové tabulky
numTargetRowsUpdated Počet řádků aktualizovaných v cílové tabulce
numTargetRowsDeleted Počet řádků odstraněných v cílové tabulce
numTargetRowsCopied Počet zkopírovaných cílových řádků
numOutputRows Celkový počet odpsaných řádků
numTargetFilesAdded Počet souborů přidaných do jímky (cíle).
numTargetFilesRemoved Počet souborů odebraných z jímky (cíle).
executionTimeMs Doba potřebná ke spuštění celé operace.
scanTimeMs Doba potřebná ke kontrole shodných souborů
přepsáníTimeMs Doba potřebná k přepsání odpovídajících souborů
UPDATE
numAddedFiles Počet přidaných souborů
numRemovedFiles Počet odebraných souborů
numUpdatedRows Počet aktualizovaných řádků
numCopiedRows Početřádkůchch
executionTimeMs Doba potřebná ke spuštění celé operace.
scanTimeMs Doba potřebná ke kontrole shodných souborů
přepsáníTimeMs Doba potřebná k přepsání odpovídajících souborů
FSCK numRemovedFiles Počet odebraných souborů
PŘEMĚNIT numConvertedFiles Počet převedených souborů Parquet.
OPTIMIZE
numAddedFiles Počet přidaných souborů
numRemovedFiles Počet optimalizovaných souborů
numAddedBytes Počet bajtů přidaných po optimalizaci tabulky
numRemovedBytes Počet odebraných bajtů
minFileSize Velikost nejmenšího souboru po optimalizaci tabulky
p25FileSize Velikost 25. percentilového souboru po optimalizaci tabulky
p50FileSize Medián velikosti souboru po optimalizaci tabulky
p75FileSize Velikost 75. percentilového souboru po optimalizaci tabulky
maxFileSize Velikost největšího souboru po optimalizaci tabulky
CLONE
sourceTableSize Velikost v bajtech zdrojové tabulky ve verzi, která je naklonovaná.
sourceNumOfFiles Počet souborů ve zdrojové tabulce v naklonované verzi
numRemovedFiles Počet souborů odebraných z cílové tabulky, pokud byla nahrazena předchozí tabulka Delta.
removedFilesSize Celková velikost v bajtech souborů odebraných z cílové tabulky, pokud byla nahrazena předchozí tabulka Delta.
numCopiedFiles Počet souborů, které byly zkopírovány do nového umístění 0 pro mělké klony.
copiedFilesSize Celková velikost v bajtech souborů, které byly zkopírovány do nového umístění. 0 pro mělké klony.
OBNOVIT
tableSizeAfterRestore Velikost tabulky v bajtech po obnovení
numOfFilesAfterRestore Počet souborů v tabulce po obnovení
numRemovedFiles Počet souborů odebraných operací obnovení
numRestoredFiles Počet souborů přidaných v důsledku obnovení
removedFilesSize Velikost v bajtech souborů odebraných obnovením
restoredFilesSize Velikost v bajtech souborů přidaných obnovením
VACUUM
numDeletedFiles Počet odstraněných souborů
numVacuumedDirectories Počet vakuových adresářů.
numFilesToDelete Počet souborů, které chcete odstranit

Co je časová cesta Delta Lake?

Časová cesta Delta Lake podporuje dotazování předchozích verzí tabulek na základě časového razítka nebo verze tabulky (jak je zaznamenáno v transakčním protokolu). Pro aplikace, jako jsou například tyto, můžete použít časová cesta:

  • Opětovné vytváření analýz, sestav nebo výstupů (například výstup modelu strojového učení). To může být užitečné pro ladění nebo auditování, zejména v regulovaných odvětvích.
  • Psaní složitých dočasných dotazů
  • Oprava chyb v datech
  • Poskytuje izolaci snímků pro sadu dotazů pro rychlé změny tabulek.

Důležité

Verze tabulek přístupné s časovým cestováním jsou určeny kombinací prahové hodnoty uchovávání pro soubory transakčních protokolů a četností a určeným uchováváním operací VACUUM . Pokud spouštíte VACUUM každý den s výchozími hodnotami, 7 dnů dat je k dispozici pro časovou cestu.

Syntaxe pohybu v čase Delta

Dotazujete tabulku Delta s časovým cestováním přidáním klauzule za specifikaci názvu tabulky.

  • timestamp_expression může být libovolná z těchto možností:
    • '2018-10-18T22:15:12.013Z'to znamená řetězec, který lze přetypovat na časové razítko.
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18'to znamená řetězec kalendářního data.
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Jakýkoli jiný výraz, který je nebo lze přetypovat na časové razítko
  • version je dlouhá hodnota, kterou lze získat z výstupu DESCRIBE HISTORY table_spec.

version Ani timestamp_expression nemůže být poddotaz.

Akceptují se pouze řetězce data nebo časového razítka. Příklad: "2019-01-01" a "2019-01-01T00:00:00.000Z". Příklad syntaxe najdete v následujícím kódu:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

Syntaxi můžete také použít @ k určení časového razítka nebo verze jako součásti názvu tabulky. Časové razítko musí být ve yyyyMMddHHmmssSSS formátu. Verzi @ můžete zadat tak, že ji předejdete v . Příklad syntaxe najdete v následujícím kódu:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

Co jsou kontrolní body transakčního protokolu?

Delta Lake zaznamenává verze tabulek jako soubory JSON v adresáři _delta_log , které se ukládají společně s daty tabulek. Pokud chcete optimalizovat dotazování kontrolních bodů, Delta Lake agreguje verze tabulek do souborů kontrolních bodů Parquet, což brání nutnosti číst všechny verze tabulek JSON historie. Azure Databricks optimalizuje frekvenci vytváření kontrolních bodů pro velikost dat a úlohu. Uživatelé by neměli pracovat s kontrolními body přímo. Četnost kontrolních bodů se může bez předchozího upozornění změnit.

Konfigurace uchovávání dat pro dotazy na časové cesty

Pokud chcete dotazovat předchozí verzi tabulky, musíte zachovat protokol i datové soubory pro danou verzi.

Datové soubory se odstraní při VACUUM spuštění v tabulce. Delta Lake spravuje odebrání souboru protokolu automaticky po verzích tabulek kontrolních bodů.

Vzhledem k tomu, že většina tabulek Delta se VACUUM s nimi spouští pravidelně, měly by dotazy k určitému bodu v čase respektovat prahovou hodnotu VACUUMuchovávání informací, což je ve výchozím nastavení 7 dnů.

Pokud chcete zvýšit prahovou hodnotu uchovávání dat pro tabulky Delta, musíte nakonfigurovat následující vlastnosti tabulky:

  • delta.logRetentionDuration = "interval <interval>": Určuje, jak dlouho se historie tabulky uchovává. Výchozí hodnota je interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": určuje prahovou hodnotu VACUUM , která se používá k odebrání datových souborů, na které se už v aktuální verzi tabulky neodkazuje. Výchozí hodnota je interval 7 days.

Během vytváření tabulky můžete zadat vlastnosti Delta nebo je nastavit pomocí ALTER TABLE příkazu. Viz odkaz na vlastnosti tabulky Delta.

Poznámka:

Obě tyto vlastnosti musíte nastavit, aby se u tabulek s častými VACUUM operacemi zachovala historie tabulek po delší dobu. Pokud například chcete získat přístup k historickým datům o 30 dnech, nastavte delta.deletedFileRetentionDuration = "interval 30 days" (které odpovídá výchozímu nastavení pro delta.logRetentionDuration).

Zvýšení prahové hodnoty uchovávání dat může způsobit zvýšení nákladů na úložiště, protože se udržuje více datových souborů.

Obnovení tabulky Delta do dřívějšího stavu

Tabulku Delta můžete obnovit do dřívějšího RESTORE stavu pomocí příkazu. Tabulka Delta interně udržuje historické verze tabulky, které umožňují obnovení do dřívějšího stavu. Verze odpovídající dřívějšímu stavu nebo časovému razítku toho, když došlo k vytvoření dřívějšího stavu, se příkazem RESTORE podporují jako možnosti.

Důležité

  • Již obnovenou tabulku můžete obnovit.
  • Klonovanou tabulku můžete obnovit.
  • Musíte mít MODIFY oprávnění k obnovené tabulce.
  • Tabulku nelze obnovit do starší verze, ve které byly datové soubory odstraněny ručně nebo pomocí vacuum. Obnovení do této verze je stále možné, pokud spark.sql.files.ignoreMissingFiles je nastavena na true.
  • Formát časového razítka pro obnovení do dřívějšího stavu je yyyy-MM-dd HH:mm:ss. Je také podporováno poskytnutí řetězce date(yyyy-MM-dd).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

Podrobnosti o syntaxi najdete v tématu OBNOVENÍ.

Důležité

Obnovení se považuje za operaci změny dat. Položky protokolu Delta Lake přidané příkazem RESTORE obsahují dataChange nastavenou na true. Pokud existuje podřízená aplikace, například úloha strukturovaného streamování , která zpracovává aktualizace tabulky Delta Lake, položky protokolu změn dat přidané operací obnovení se považují za nové aktualizace dat a jejich zpracování může vést k duplicitním datům.

Příklad:

Verze tabulky Operace Rozdílové aktualizace protokolů Záznamy v aktualizacích protokolu změn dat
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
0 INSERT AddFile(/path/to/file-2; dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/path/to/file-3; dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Žádné záznamy jako optimalizace komprimace nemění data v tabulce)
3 RESTORE(verze=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

V předchozím příkladu RESTORE má příkaz za následek aktualizace, které se už zobrazily při čtení tabulky Delta verze 0 a 1. Pokud streamovací dotaz četl tuto tabulku, budou se tyto soubory považovat za nově přidaná data a budou znovu zpracovány.

Obnovení metrik

RESTORE Po dokončení operace sestaví následující metriky jako datový rámec s jedním řádkem:

  • table_size_after_restore: Velikost tabulky po obnovení.

  • num_of_files_after_restore: Počet souborů v tabulce po obnovení.

  • num_removed_files: Počet odebraných souborů (logicky odstraněných) z tabulky.

  • num_restored_files: Počet obnovených souborů kvůli vrácení zpět

  • removed_files_size: Celková velikost v bajtech souborů, které jsou z tabulky odebrány.

  • restored_files_size: Celková velikost v bajtech obnovených souborů.

    Příklad obnovení metrik

Příklady použití pohybu v čase Delta Lake

  • Oprava náhodného odstranění tabulky pro uživatele 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Oprava náhodných nesprávných aktualizací tabulky:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Zadejte dotaz na počet nových zákazníků přidaných za poslední týden.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

Návody najít verzi posledního potvrzení v relaci Sparku?

Pokud chcete získat číslo verze posledního potvrzení napsaného aktuální SparkSession ve všech vláknech a všech tabulkách, zadejte dotaz na konfiguraci spark.databricks.delta.lastCommitVersionInSessionSQL .

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Pokud nebyly provedeny SparkSessionžádné potvrzení , dotazování na klíč vrátí prázdnou hodnotu.

Poznámka:

Pokud sdílíte stejnou hodnotu SparkSession ve více vláknech, je to podobné sdílení proměnné napříč více vlákny. Pokud se hodnota konfigurace aktualizuje současně, můžete narazit na podmínky časování.