Práce s historií tabulek Delta Lake
Každá operace, která upravuje tabulku Delta Lake, vytvoří novou verzi tabulky. Informace o historii můžete použít k auditování operací, vrácení tabulky zpět nebo dotazování na tabulku v určitém časovém okamžiku pomocí časového cestování.
Poznámka:
Databricks nedoporučuje používat historii tabulky Delta Lake jako řešení dlouhodobého zálohování pro archivaci dat. Pokud jste nenastavili větší hodnotu pro uchovávání dat i protokolů, doporučuje Databricks používat pro operace pohyb v čase pouze posledních 7 dnů.
Načtení historie tabulky Delta
Spuštěním příkazu můžete načíst informace, včetně operací, uživatelů a časového razítka každého zápisu history
do tabulky Delta. Operace se vrátí v obráceném chronologickém pořadí.
Uchovávání historie tabulek je určeno nastavením delta.logRetentionDuration
tabulky, což je ve výchozím nastavení 30 dnů.
Poznámka:
Pohyb v čase a historie tabulky jsou řízeny různými prahovými hodnotami uchovávání dat. Přečtěte si: Co je pohyb v čase ve službě Delta Lake?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Podrobnosti o syntaxi Spark SQL najdete v tématu POPIS HISTORIE.
Podrobnosti o syntaxi jazyka Scala/Java/Python najdete v dokumentaci k rozhraní Delta Lake API.
Průzkumník katalogu poskytuje vizuální zobrazení těchto podrobných informací o tabulce a historii tabulek Delta. Kromě schématu tabulky a ukázkových dat můžete kliknutím na kartu Historie zobrazit historii tabulek, která se zobrazí pomocí DESCRIBE HISTORY
.
Schéma historie
Výstup operace history
má následující sloupce.
Column | Type | Popis |
---|---|---|
version | long | Verze tabulky vygenerovaná operací |
časové razítko | časové razítko | Při potvrzení této verze. |
userId | string | ID uživatele, který operaci spustil. |
userName | string | Jméno uživatele, který operaci spustil. |
operation | string | Název operace. |
operationParameters | map | Parametry operace (například predikáty).) |
úloha | struct | Podrobnosti o úloze, která operaci spustila. |
poznámkový blok | struct | Podrobnosti o poznámkovém bloku, ze kterého byla operace spuštěna. |
clusterId | string | ID clusteru, na kterém byla operace spuštěna. |
readVersion | long | Verze tabulky, která byla přečtená k provedení operace zápisu. |
isolationLevel | string | Úroveň izolace použitá pro tuto operaci. |
isBlindAppend | boolean | Určuje, jestli tato operace připojila data. |
operationMetrics | map | Metriky operace (například počet řádků a souborů upravených).) |
userMetadata | string | Uživatelsky definovaná metadata potvrzení, pokud byla zadána |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Poznámka:
- Pokud do tabulky Delta zapíšete následující metody, není k dispozici několik dalších sloupců:
- Sloupce přidané v budoucnu se vždy přidají za poslední sloupec.
Klíče metrik operací
Operace history
vrátí kolekci metrik operací v mapě operationMetrics
sloupců.
Následující tabulky uvádějí definice klíčů mapování podle operace.
Operace | Název metriky | Popis |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO | ||
numFiles | Počet zapsaných souborů | |
numOutputBytes | Velikost v bajtech psaného obsahu | |
numOutputRows | Počet zapsaných řádků | |
AKTUALIZACE STREAMOVÁNÍ | ||
numAddedFiles | Počet přidaných souborů | |
numRemovedFiles | Počet odebraných souborů | |
numOutputRows | Počet zapsaných řádků | |
numOutputBytes | Velikost zápisu v bajtech | |
DELETE | ||
numAddedFiles | Počet přidaných souborů Nezadá se při odstranění oddílů tabulky. | |
numRemovedFiles | Počet odebraných souborů | |
numDeletedRows | Počet odebraných řádků Nezadá se při odstranění oddílů tabulky. | |
numCopiedRows | Počet řádků zkopírovaných v procesu odstraňování souborů | |
executionTimeMs | Doba potřebná ke spuštění celé operace. | |
scanTimeMs | Doba potřebná ke kontrole shodných souborů | |
přepsáníTimeMs | Doba potřebná k přepsání odpovídajících souborů | |
ZKRÁTIT | ||
numRemovedFiles | Počet odebraných souborů | |
executionTimeMs | Doba potřebná ke spuštění celé operace. | |
SLOUČIT | ||
numSourceRows | Počet řádků ve zdrojovém datovém rámci | |
numTargetRowsInserted | Počet řádků vložených do cílové tabulky | |
numTargetRowsUpdated | Počet řádků aktualizovaných v cílové tabulce | |
numTargetRowsDeleted | Počet řádků odstraněných v cílové tabulce | |
numTargetRowsCopied | Počet zkopírovaných cílových řádků | |
numOutputRows | Celkový počet odpsaných řádků | |
numTargetFilesAdded | Počet souborů přidaných do jímky (cíle). | |
numTargetFilesRemoved | Počet souborů odebraných z jímky (cíle). | |
executionTimeMs | Doba potřebná ke spuštění celé operace. | |
scanTimeMs | Doba potřebná ke kontrole shodných souborů | |
přepsáníTimeMs | Doba potřebná k přepsání odpovídajících souborů | |
UPDATE | ||
numAddedFiles | Počet přidaných souborů | |
numRemovedFiles | Počet odebraných souborů | |
numUpdatedRows | Počet aktualizovaných řádků | |
numCopiedRows | Početřádkůchch | |
executionTimeMs | Doba potřebná ke spuštění celé operace. | |
scanTimeMs | Doba potřebná ke kontrole shodných souborů | |
přepsáníTimeMs | Doba potřebná k přepsání odpovídajících souborů | |
FSCK | numRemovedFiles | Počet odebraných souborů |
PŘEMĚNIT | numConvertedFiles | Počet převedených souborů Parquet. |
OPTIMIZE | ||
numAddedFiles | Počet přidaných souborů | |
numRemovedFiles | Počet optimalizovaných souborů | |
numAddedBytes | Počet bajtů přidaných po optimalizaci tabulky | |
numRemovedBytes | Počet odebraných bajtů | |
minFileSize | Velikost nejmenšího souboru po optimalizaci tabulky | |
p25FileSize | Velikost 25. percentilového souboru po optimalizaci tabulky | |
p50FileSize | Medián velikosti souboru po optimalizaci tabulky | |
p75FileSize | Velikost 75. percentilového souboru po optimalizaci tabulky | |
maxFileSize | Velikost největšího souboru po optimalizaci tabulky | |
CLONE | ||
sourceTableSize | Velikost v bajtech zdrojové tabulky ve verzi, která je naklonovaná. | |
sourceNumOfFiles | Počet souborů ve zdrojové tabulce v naklonované verzi | |
numRemovedFiles | Počet souborů odebraných z cílové tabulky, pokud byla nahrazena předchozí tabulka Delta. | |
removedFilesSize | Celková velikost v bajtech souborů odebraných z cílové tabulky, pokud byla nahrazena předchozí tabulka Delta. | |
numCopiedFiles | Počet souborů, které byly zkopírovány do nového umístění 0 pro mělké klony. | |
copiedFilesSize | Celková velikost v bajtech souborů, které byly zkopírovány do nového umístění. 0 pro mělké klony. | |
OBNOVIT | ||
tableSizeAfterRestore | Velikost tabulky v bajtech po obnovení | |
numOfFilesAfterRestore | Počet souborů v tabulce po obnovení | |
numRemovedFiles | Počet souborů odebraných operací obnovení | |
numRestoredFiles | Počet souborů přidaných v důsledku obnovení | |
removedFilesSize | Velikost v bajtech souborů odebraných obnovením | |
restoredFilesSize | Velikost v bajtech souborů přidaných obnovením | |
VACUUM | ||
numDeletedFiles | Počet odstraněných souborů | |
numVacuumedDirectories | Počet vakuových adresářů. | |
numFilesToDelete | Počet souborů, které chcete odstranit |
Co je časová cesta Delta Lake?
Časová cesta Delta Lake podporuje dotazování předchozích verzí tabulek na základě časového razítka nebo verze tabulky (jak je zaznamenáno v transakčním protokolu). Pro aplikace, jako jsou například tyto, můžete použít časová cesta:
- Opětovné vytváření analýz, sestav nebo výstupů (například výstup modelu strojového učení). To může být užitečné pro ladění nebo auditování, zejména v regulovaných odvětvích.
- Psaní složitých dočasných dotazů
- Oprava chyb v datech
- Poskytuje izolaci snímků pro sadu dotazů pro rychlé změny tabulek.
Důležité
Verze tabulek přístupné s časovým cestováním jsou určeny kombinací prahové hodnoty uchovávání pro soubory transakčních protokolů a četností a určeným uchováváním operací VACUUM
. Pokud spouštíte VACUUM
každý den s výchozími hodnotami, 7 dnů dat je k dispozici pro časovou cestu.
Syntaxe pohybu v čase Delta
Dotazujete tabulku Delta s časovým cestováním přidáním klauzule za specifikaci názvu tabulky.
timestamp_expression
může být libovolná z těchto možností:'2018-10-18T22:15:12.013Z'
to znamená řetězec, který lze přetypovat na časové razítko.cast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
to znamená řetězec kalendářního data.current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Jakýkoli jiný výraz, který je nebo lze přetypovat na časové razítko
version
je dlouhá hodnota, kterou lze získat z výstupuDESCRIBE HISTORY table_spec
.
version
Ani timestamp_expression
nemůže být poddotaz.
Akceptují se pouze řetězce data nebo časového razítka. Příklad: "2019-01-01"
a "2019-01-01T00:00:00.000Z"
. Příklad syntaxe najdete v následujícím kódu:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
Syntaxi můžete také použít @
k určení časového razítka nebo verze jako součásti názvu tabulky. Časové razítko musí být ve yyyyMMddHHmmssSSS
formátu. Verzi @
můžete zadat tak, že ji předejdete v
. Příklad syntaxe najdete v následujícím kódu:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
Co jsou kontrolní body transakčního protokolu?
Delta Lake zaznamenává verze tabulek jako soubory JSON v adresáři _delta_log
, které se ukládají společně s daty tabulek. Pokud chcete optimalizovat dotazování kontrolních bodů, Delta Lake agreguje verze tabulek do souborů kontrolních bodů Parquet, což brání nutnosti číst všechny verze tabulek JSON historie. Azure Databricks optimalizuje frekvenci vytváření kontrolních bodů pro velikost dat a úlohu. Uživatelé by neměli pracovat s kontrolními body přímo. Četnost kontrolních bodů se může bez předchozího upozornění změnit.
Konfigurace uchovávání dat pro dotazy na časové cesty
Pokud chcete dotazovat předchozí verzi tabulky, musíte zachovat protokol i datové soubory pro danou verzi.
Datové soubory se odstraní při VACUUM
spuštění v tabulce. Delta Lake spravuje odebrání souboru protokolu automaticky po verzích tabulek kontrolních bodů.
Vzhledem k tomu, že většina tabulek Delta se VACUUM
s nimi spouští pravidelně, měly by dotazy k určitému bodu v čase respektovat prahovou hodnotu VACUUM
uchovávání informací, což je ve výchozím nastavení 7 dnů.
Pokud chcete zvýšit prahovou hodnotu uchovávání dat pro tabulky Delta, musíte nakonfigurovat následující vlastnosti tabulky:
delta.logRetentionDuration = "interval <interval>"
: Určuje, jak dlouho se historie tabulky uchovává. Výchozí hodnota jeinterval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: určuje prahovou hodnotuVACUUM
, která se používá k odebrání datových souborů, na které se už v aktuální verzi tabulky neodkazuje. Výchozí hodnota jeinterval 7 days
.
Během vytváření tabulky můžete zadat vlastnosti Delta nebo je nastavit pomocí ALTER TABLE
příkazu. Viz odkaz na vlastnosti tabulky Delta.
Poznámka:
Obě tyto vlastnosti musíte nastavit, aby se u tabulek s častými VACUUM
operacemi zachovala historie tabulek po delší dobu. Pokud například chcete získat přístup k historickým datům o 30 dnech, nastavte delta.deletedFileRetentionDuration = "interval 30 days"
(které odpovídá výchozímu nastavení pro delta.logRetentionDuration
).
Zvýšení prahové hodnoty uchovávání dat může způsobit zvýšení nákladů na úložiště, protože se udržuje více datových souborů.
Obnovení tabulky Delta do dřívějšího stavu
Tabulku Delta můžete obnovit do dřívějšího RESTORE
stavu pomocí příkazu. Tabulka Delta interně udržuje historické verze tabulky, které umožňují obnovení do dřívějšího stavu.
Verze odpovídající dřívějšímu stavu nebo časovému razítku toho, když došlo k vytvoření dřívějšího stavu, se příkazem RESTORE
podporují jako možnosti.
Důležité
- Již obnovenou tabulku můžete obnovit.
- Klonovanou tabulku můžete obnovit.
- Musíte mít
MODIFY
oprávnění k obnovené tabulce. - Tabulku nelze obnovit do starší verze, ve které byly datové soubory odstraněny ručně nebo pomocí
vacuum
. Obnovení do této verze je stále možné, pokudspark.sql.files.ignoreMissingFiles
je nastavena natrue
. - Formát časového razítka pro obnovení do dřívějšího stavu je
yyyy-MM-dd HH:mm:ss
. Je také podporováno poskytnutí řetězce date(yyyy-MM-dd
).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Podrobnosti o syntaxi najdete v tématu OBNOVENÍ.
Důležité
Obnovení se považuje za operaci změny dat. Položky protokolu Delta Lake přidané příkazem RESTORE
obsahují dataChange nastavenou na true. Pokud existuje podřízená aplikace, například úloha strukturovaného streamování , která zpracovává aktualizace tabulky Delta Lake, položky protokolu změn dat přidané operací obnovení se považují za nové aktualizace dat a jejich zpracování může vést k duplicitním datům.
Příklad:
Verze tabulky | Operace | Rozdílové aktualizace protokolů | Záznamy v aktualizacích protokolu změn dat |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
0 | INSERT | AddFile(/path/to/file-2; dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3; dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Žádné záznamy jako optimalizace komprimace nemění data v tabulce) |
3 | RESTORE(verze=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
V předchozím příkladu RESTORE
má příkaz za následek aktualizace, které se už zobrazily při čtení tabulky Delta verze 0 a 1. Pokud streamovací dotaz četl tuto tabulku, budou se tyto soubory považovat za nově přidaná data a budou znovu zpracovány.
Obnovení metrik
RESTORE
Po dokončení operace sestaví následující metriky jako datový rámec s jedním řádkem:
table_size_after_restore
: Velikost tabulky po obnovení.num_of_files_after_restore
: Počet souborů v tabulce po obnovení.num_removed_files
: Počet odebraných souborů (logicky odstraněných) z tabulky.num_restored_files
: Počet obnovených souborů kvůli vrácení zpětremoved_files_size
: Celková velikost v bajtech souborů, které jsou z tabulky odebrány.restored_files_size
: Celková velikost v bajtech obnovených souborů.
Příklady použití pohybu v čase Delta Lake
Oprava náhodného odstranění tabulky pro uživatele
111
:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Oprava náhodných nesprávných aktualizací tabulky:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Zadejte dotaz na počet nových zákazníků přidaných za poslední týden.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Návody najít verzi posledního potvrzení v relaci Sparku?
Pokud chcete získat číslo verze posledního potvrzení napsaného aktuální SparkSession
ve všech vláknech a všech tabulkách, zadejte dotaz na konfiguraci spark.databricks.delta.lastCommitVersionInSession
SQL .
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Pokud nebyly provedeny SparkSession
žádné potvrzení , dotazování na klíč vrátí prázdnou hodnotu.
Poznámka:
Pokud sdílíte stejnou hodnotu SparkSession
ve více vláknech, je to podobné sdílení proměnné napříč více vlákny. Pokud se hodnota konfigurace aktualizuje současně, můžete narazit na podmínky časování.