Delta Lake tablo geçmişiyle çalışma
Delta Lake tablosunu değiştiren her işlem yeni bir tablo sürümü oluşturur. Zaman yolculuğu kullanarak işlemleri denetlemek, bir tabloyu geri almak veya belirli bir zamanda tabloyu sorgulamak için geçmiş bilgilerini kullanabilirsiniz.
Not
Databricks, Delta Lake tablo geçmişinin veri arşivleme için uzun vadeli bir yedekleme çözümü olarak kullanılmasını önermez. Databricks, hem veri hem de günlük saklama yapılandırmalarını daha büyük bir değere ayarlamadığınız sürece zaman atlatma işlemleri için yalnızca son 7 günün kullanılmasını önerir.
Delta tablosu geçmişini alma
Komutunu çalıştırarak history
Delta tablosuna yapılan her yazma işlemi, kullanıcı ve zaman damgası gibi bilgileri alabilirsiniz. İşlemler ters kronolojik sırayla döndürülür.
Tablo geçmişi saklama, varsayılan olarak 30 gün olan tablo ayarı delta.logRetentionDuration
tarafından belirlenir.
Not
Zaman atlatma ve tablo geçmişi, farklı bekletme eşikleri tarafından denetlenmektedir. Bkz . Delta Lake zaman atlatma nedir?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Spark SQL söz dizimi ayrıntıları için bkz . DESCRIBE HISTORY.
Scala/Java/Python söz dizimi ayrıntıları için Delta Lake API belgelerine bakın.
Katalog Gezgini , Delta tabloları için bu ayrıntılı tablo bilgilerinin ve geçmişinin görsel bir görünümünü sağlar. Tablo şemasına ve örnek verilere ek olarak, ile DESCRIBE HISTORY
görüntülenen tablo geçmişini görmek için Geçmiş sekmesine tıklayabilirsiniz.
Geçmiş şeması
İşlemin çıktısı history
aşağıdaki sütunlara sahiptir.
Sütun | Türü | Açıklama |
---|---|---|
sürüm | uzun | İşlem tarafından oluşturulan tablo sürümü. |
timestamp | timestamp | Bu sürüm işlendiğinde. |
userId | Dize | İşlemi çalıştıran kullanıcının kimliği. |
userName | Dize | İşlemi çalıştıran kullanıcının adı. |
operation | Dize | İşlemin adı. |
operationParameters | map | İşlemin parametreleri (örneğin, koşul.) |
iş | struct | İşlemi çalıştıran işin ayrıntıları. |
not defteri | struct | İşlemin çalıştırıldığı not defterinin ayrıntıları. |
clusterId | Dize | İşlemin üzerinde çalıştırıldığı kümenin kimliği. |
readVersion | uzun | Yazma işlemini gerçekleştirmek için okunan tablonun sürümü. |
isolationLevel | Dize | Bu işlem için kullanılan yalıtım düzeyi. |
isBlindAppend | boolean | Bu işlemin verileri ekleyip eklemediği. |
operationMetrics | map | İşlemin ölçümleri (örneğin, değiştirilen satır ve dosya sayısı.) |
userMetadata | Dize | Belirtilmişse kullanıcı tanımlı işleme meta verileri |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Not
- Aşağıdaki yöntemleri kullanarak bir Delta tablosuna yazarsanız diğer sütunlardan birkaçı kullanılamaz:
- Gelecekte eklenen sütunlar her zaman son sütundan sonra eklenir.
İşlem ölçümleri anahtarları
İşlem, history
sütun haritasında operationMetrics
bir işlem ölçümleri koleksiyonu döndürür.
Aşağıdaki tablolarda eşleme anahtarı tanımları işleme göre listelemektedir.
İşlem | Ölçüm adı | Açıklama |
---|---|---|
YAZMA, SEÇME OLARAK TABLO OLUŞTURMA, TABLOYU SEÇ OLARAK DEĞIŞTIRME, IÇINE KOPYALAMA | ||
numFiles | Yazılan dosya sayısı. | |
numOutputBytes | Yazılan içeriğin bayt cinsinden boyutu. | |
numOutputRows | Yazılan satır sayısı. | |
AKıŞ GÜNCELLEŞTIRMESI | ||
numAddedFiles | Eklenen dosya sayısı. | |
numRemovedFiles | Kaldırılan dosya sayısı. | |
numOutputRows | Yazılan satır sayısı. | |
numOutputBytes | Bayt cinsinden yazma boyutu. | |
SİL | ||
numAddedFiles | Eklenen dosya sayısı. Tablonun bölümleri silindiğinde sağlanmaz. | |
numRemovedFiles | Kaldırılan dosya sayısı. | |
numDeletedRows | Kaldırılan satır sayısı. Tablonun bölümleri silindiğinde sağlanmaz. | |
numCopiedRows | Dosyaları silme işleminde kopyalanan satır sayısı. | |
executionTimeMs | İşlemin tamamını yürütmek için geçen süre. | |
scanTimeMs | Dosyaları eşleşmeler için taramak için geçen süre. | |
rewriteTimeMs | Eşleşen dosyaları yeniden yazmak için geçen süre. | |
TRUNCATE | ||
numRemovedFiles | Kaldırılan dosya sayısı. | |
executionTimeMs | İşlemin tamamını yürütmek için geçen süre. | |
BİRLEŞMEK | ||
numSourceRows | Kaynak DataFrame'deki satır sayısı. | |
numTargetRowsInserted | Hedef tabloya eklenen satır sayısı. | |
numTargetRowsUpdated | Hedef tabloda güncelleştirilen satır sayısı. | |
numTargetRowsDeleted | Hedef tabloda silinen satır sayısı. | |
numTargetRowsCopied | Kopyalanan hedef satır sayısı. | |
numOutputRows | Yazılan toplam satır sayısı. | |
numTargetFilesAdded | Havuza (hedef) eklenen dosya sayısı. | |
numTargetFilesRemoved | Havuzdan (hedef) kaldırılan dosya sayısı. | |
executionTimeMs | İşlemin tamamını yürütmek için geçen süre. | |
scanTimeMs | Dosyaları eşleşmeler için taramak için geçen süre. | |
rewriteTimeMs | Eşleşen dosyaları yeniden yazmak için geçen süre. | |
UPDATE | ||
numAddedFiles | Eklenen dosya sayısı. | |
numRemovedFiles | Kaldırılan dosya sayısı. | |
numUpdatedRows | Güncelleştirilen satır sayısı. | |
numCopiedRows | Dosyaları güncelleştirme işleminde kopyalanan satır sayısı. | |
executionTimeMs | İşlemin tamamını yürütmek için geçen süre. | |
scanTimeMs | Dosyaları eşleşmeler için taramak için geçen süre. | |
rewriteTimeMs | Eşleşen dosyaları yeniden yazmak için geçen süre. | |
FSCK | numRemovedFiles | Kaldırılan dosya sayısı. |
DÖNÜŞTÜRMEK | numConvertedFiles | Dönüştürülen Parquet dosyalarının sayısı. |
OPTIMIZE | ||
numAddedFiles | Eklenen dosya sayısı. | |
numRemovedFiles | İyileştirilmiş dosya sayısı. | |
numAddedBytes | Tablo iyileştirildikten sonra eklenen bayt sayısı. | |
numRemovedBytes | Kaldırılan bayt sayısı. | |
minFileSize | Tablo iyileştirildikten sonra en küçük dosyanın boyutu. | |
p25FileSize | Tablo iyileştirildikten sonra 25. yüzdebirlik dosyasının boyutu. | |
p50FileSize | Tablo iyileştirildikten sonra ortanca dosya boyutu. | |
p75FileSize | Tablo iyileştirildikten sonra 75. yüzdebirlik dosyasının boyutu. | |
maxFileSize | Tablo iyileştirildikten sonra en büyük dosyanın boyutu. | |
CLONE | ||
sourceTableSize | Kopyalanan sürümdeki kaynak tablonun bayt cinsinden boyutu. | |
sourceNumOfFiles | Kopyalanan sürümdeki kaynak tablodaki dosya sayısı. | |
numRemovedFiles | Önceki bir Delta tablosu değiştirildiyse hedef tablodan kaldırılan dosya sayısı. | |
removedFilesSize | Önceki bir Delta tablosu değiştirildiyse hedef tablodan kaldırılan dosyaların bayt cinsinden toplam boyutu. | |
numCopiedFiles | Yeni konuma kopyalanan dosyaların sayısı. Sığ klonlar için 0. | |
copiedFilesSize | Yeni konuma kopyalanan dosyaların bayt cinsinden toplam boyutu. Sığ klonlar için 0. | |
GERİ YÜKLEMEK | ||
tableSizeAfterRestore | Geri yüklemeden sonra bayt cinsinden tablo boyutu. | |
numOfFilesAfterRestore | Geri yüklemeden sonra tablodaki dosya sayısı. | |
numRemovedFiles | Geri yükleme işlemi tarafından kaldırılan dosya sayısı. | |
numRestoredFiles | Geri yükleme sonucunda eklenen dosya sayısı. | |
removedFilesSize | Geri yükleme tarafından kaldırılan dosyaların bayt cinsinden boyutu. | |
restoredFilesSize | Geri yükleme tarafından eklenen dosyaların bayt cinsinden boyutu. | |
VACUUM | ||
numDeletedFiles | Silinen dosya sayısı. | |
numVacuumedDirectories | Vakumlanmış dizin sayısı. | |
numFilesToDelete | Silinecek dosya sayısı. |
Delta Lake zaman yolculuğu nedir?
Delta Lake zaman yolculuğu, zaman damgasına veya tablo sürümüne göre (işlem günlüğünde kaydedildiği gibi) önceki tablo sürümlerini sorgulamayı destekler. Aşağıdaki gibi uygulamalar için zaman yolculuğu kullanabilirsiniz:
- Çözümlemeleri, raporları veya çıkışları yeniden oluşturma (örneğin, bir makine öğrenmesi modelinin çıktısı). Bu, özellikle düzenlemeye tabi sektörlerde hata ayıklama veya denetim için yararlı olabilir.
- Karmaşık zamana bağlı sorgular yazma.
- Verilerinizdeki hataları düzeltme.
- Hızlı değişen tablolara yönelik bir dizi sorgu için anlık görüntü yalıtımı sağlama.
Önemli
Zaman yolculuğuyla erişilebilen tablo sürümleri, işlem günlüğü dosyaları için bekletme eşiği ile işlemler için VACUUM
sıklık ve belirtilen bekletmenin bir bileşimiyle belirlenir. Günlük olarak varsayılan değerlerle çalışıyorsanız VACUUM
, zaman yolculuğu için 7 günlük veri kullanılabilir.
Delta zaman atlatma söz dizimi
Tablo adı belirtiminin arkasına bir yan tümce ekleyerek bir Delta tablosunu zaman yolculuğuyla sorgularsınız.
timestamp_expression
şu türlerden herhangi biri olabilir:'2018-10-18T22:15:12.013Z'
, başka bir ifadeyle zaman damgasına atanabilen bir dizedircast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, yani bir tarih dizesicurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- Zaman damgasına atanabilen veya atanabilen diğer tüm ifadeler
version
, çıkışından elde edilebilen uzun bir değerdirDESCRIBE HISTORY table_spec
.
Alt timestamp_expression
sorgular da version
olamaz.
Yalnızca tarih veya zaman damgası dizeleri kabul edilir. Örneğin, "2019-01-01"
ve "2019-01-01T00:00:00.000Z"
. Söz dizimi gibi aşağıdaki koda bakın:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
Zaman damgasını @
veya sürümü tablo adının bir parçası olarak belirtmek için söz dizimini de kullanabilirsiniz. Zaman damgası biçiminde yyyyMMddHHmmssSSS
olmalıdır. sürümünden sonra @
bir ön koşula bağlanarak bir v
sürüm belirtebilirsiniz. Söz dizimi gibi aşağıdaki koda bakın:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
İşlem günlüğü denetim noktaları nedir?
Delta Lake, tablo sürümlerini dizin içinde _delta_log
JSON dosyaları olarak kaydeder ve bu dosyalar tablo verileriyle birlikte depolanır. Kontrol noktası sorgulamasını iyileştirmek için Delta Lake tablo sürümlerini Parquet denetim noktası dosyalarına toplayarak tablo geçmişinin tüm JSON sürümlerini okuma gereksinimini önler. Azure Databricks, veri boyutu ve iş yükü için denetim noktası oluşturma sıklığını iyileştirir. Kullanıcıların denetim noktalarıyla doğrudan etkileşim kurması gerekmez. Denetim noktası sıklığı bildirimde bulunmadan değiştirilebilir.
Zaman yolculuğu sorguları için veri saklamayı yapılandırma
Önceki bir tablo sürümünü sorgulamak için, bu sürüm için hem günlüğü hem de veri dosyalarını tutmanız gerekir.
Bir tabloda çalıştırıldığında VACUUM
veri dosyaları silinir. Delta Lake, tablo sürümlerini denetledikten sonra günlük dosyasının kaldırılmasını otomatik olarak yönetir.
Delta tablolarının VACUUM
çoğu bunlara karşı düzenli olarak çalıştığından, belirli bir noktaya sorgular için VACUUM
varsayılan olarak 7 gün olan bekletme eşiğine uygun olmalıdır.
Delta tablolarında veri saklama eşiğini artırmak için aşağıdaki tablo özelliklerini yapılandırmanız gerekir:
delta.logRetentionDuration = "interval <interval>"
: bir tablonun geçmişinin ne kadar süreyle tutulduğunu denetler. Varsayılan değer:interval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: geçerli tablo sürümünde artık başvurulmayan veri dosyalarını kaldırmak için kullanılan eşiğiVACUUM
belirler. Varsayılan değer:interval 7 days
.
Tablo oluşturma sırasında Delta özelliklerini belirtebilir veya bunları bir ALTER TABLE
deyimiyle ayarlayabilirsiniz. Bkz . Delta tablosu özellikleri başvurusu.
Not
Sık VACUUM
işlemleri olan tablolarda tablo geçmişinin daha uzun süre tutulabilmesi için bu özelliklerin her ikisini de ayarlamanız gerekir. Örneğin, 30 günlük geçmiş verilerine erişmek için ayarlayın delta.deletedFileRetentionDuration = "interval 30 days"
(için delta.logRetentionDuration
varsayılan ayarla eşleşir).
Veri saklama eşiğinin artırılması, daha fazla veri dosyası tutuldukçe depolama maliyetlerinizin artmasına neden olabilir.
Delta tablosunu önceki bir duruma geri yükleme
Komutunu kullanarak RESTORE
Delta tablosunu önceki durumuna geri yükleyebilirsiniz. Delta tablosu, tablonun önceki bir duruma geri yüklenmesini sağlayan geçmiş sürümlerini dahili olarak korur.
Önceki duruma veya önceki durumun oluşturulduğu zaman damgasına karşılık gelen bir sürüm, RESTORE
komutu tarafından seçenekler olarak desteklenir.
Önemli
- Zaten geri yüklenmiş bir tabloyu geri yükleyebilirsiniz.
- Kopyalanmış bir tabloyu geri yükleyebilirsiniz.
- Geri yüklenen tablo üzerinde izniniz olmalıdır
MODIFY
. - Bir tabloyu, veri dosyalarının el ile veya tarafından
vacuum
silindiği eski bir sürüme geri yükleyemezsiniz. bu sürüme kısmen geri yükleme, olarak ayarlandıysaspark.sql.files.ignoreMissingFiles
true
yine de mümkündür. - Önceki bir duruma geri yüklemek için zaman damgası biçimi şeklindedir
yyyy-MM-dd HH:mm:ss
. Yalnızca bir tarih(yyyy-MM-dd
) dizesi sağlanması da desteklenir.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Söz dizimi ayrıntıları için bkz . RESTORE.
Önemli
Geri yükleme, veri değiştiren bir işlem olarak kabul edilir. komutu tarafından RESTORE
eklenen Delta Lake günlük girdileri dataChange değerini true olarak ayarlar. Delta Lake tablosundaki güncelleştirmeleri işleyen yapılandırılmış akış işi gibi bir aşağı akış uygulaması varsa, geri yükleme işlemi tarafından eklenen veri değişikliği günlük girişleri yeni veri güncelleştirmeleri olarak kabul edilir ve bunların işlenmesi yinelenen verilere neden olabilir.
Örneğin:
Tablo sürümü | İşlem | Delta günlüğü güncelleştirmeleri | Veri değişikliği günlük güncelleştirmelerindeki kayıtlar |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (ad = Viktor, yaş = 29, (ad = George, yaş = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (Sıkıştırmayı iyileştir olarak hiçbir kayıt tablodaki verileri değiştirmez) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (ad = Viktor, yaş = 29), (ad = George, yaş = 55), (ad = George, yaş = 39) |
Yukarıdaki örnekte, komut Delta RESTORE
tablosu sürüm 0 ve 1 okunurken zaten görülen güncelleştirmelere neden olur. Bir akış sorgusu bu tabloyu okuyorsa, bu dosyalar yeni eklenen veriler olarak kabul edilir ve yeniden işlenir.
Ölçümleri geri yükleme
RESTORE
işlem tamamlandıktan sonra aşağıdaki ölçümleri tek satırlık DataFrame olarak bildirir:
table_size_after_restore
: Geri yüklemeden sonra tablonun boyutu.num_of_files_after_restore
: Geri yüklemeden sonra tablodaki dosya sayısı.num_removed_files
: Tablodan kaldırılan (mantıksal olarak silinen) dosya sayısı.num_restored_files
: Geri dönme nedeniyle geri yüklenen dosyaların sayısı.removed_files_size
: Tablodan kaldırılan dosyaların bayt cinsinden toplam boyutu.restored_files_size
: Geri yüklenen dosyaların bayt cinsinden toplam boyutu.
Delta Lake zaman atlatma kullanımına örnekler
Kullanıcının
111
tabloya yanlışlıkla silmelerini düzeltin:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Tabloya yanlışlıkla yapılan yanlış güncelleştirmeler düzeltilir:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Geçen hafta eklenen yeni müşterilerin sayısını sorgula.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Spark oturumunda son işlemenin sürümünü Nasıl yaparım? buldunuz?
Geçerli SparkSession
tarafından tüm iş parçacıklarında ve tüm tablolarda yazılan son işlemenin sürüm numarasını almak için SQL yapılandırmasını spark.databricks.delta.lastCommitVersionInSession
sorgular.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
tarafından SparkSession
hiçbir işleme yapılmadıysa, anahtarın sorgulanması boş bir değer döndürür.
Not
Aynı SparkSession
değeri birden çok iş parçacığında paylaşıyorsanız, bu bir değişkeni birden çok iş parçacığı arasında paylaşmaya benzer; yapılandırma değeri eşzamanlı olarak güncelleştirildikçe yarış koşullarına çarpabilirsiniz.