Selektives Überschreiben von Daten mit Delta Lake
Azure Databricks nutzt die Delta Lake-Funktionalität, um zwei verschiedene Optionen für selektive Überschreibungen zu unterstützen:
- Die Option
replaceWhere
ersetzt automatisch alle Datensätze, die einem bestimmten Prädikat entsprechen. - Sie können Datenverzeichnisse mithilfe dynamischer Partitionsüberschreibungen basierend darauf ersetzen, wie Tabellen partitioniert sind.
Für die meisten Vorgänge empfiehlt Databricks die Verwendung von replaceWhere
, um anzugeben, welche Daten überschrieben werden sollen.
Wichtig
Wenn Daten versehentlich überschrieben wurden, können Sie restore verwenden, um die Änderung rückgängig zu machen.
Beliebiges selektives Überschreiben mit replaceWhere
Sie können selektiv nur die Daten überschreiben, die einem beliebigen Ausdruck entsprechen.
Hinweis
SQL erfordert Databricks Runtime 12.2 LTS oder höher.
Mit dem folgenden Befehl werden Ereignisse im Januar in der nach start_date
partitionierten Zieltabelle atomisch durch die Daten in replace_data
ersetzt:
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
Dieser Beispielcode schreibt die Daten aus replace_data
, überprüft, ob alle Zeilen mit dem Prädikat übereinstimmen, und führt mithilfe der overwrite
-Semantik eine atomische Ersetzung durch. Wenn Werte im Vorgang außerhalb der Einschränkung liegen, tritt standardmäßig ein Fehler auf.
Sie können dieses Verhalten zu overwrite
-Werten innerhalb des Prädikatbereichs und zu insert
-Datensätzen ändern, die außerhalb des angegebenen Bereichs liegen. Deaktivieren Sie dazu die Überprüfung von Einschränkungen, indem Sie spark.databricks.delta.replaceWhere.constraintCheck.enabled
mithilfe einer der folgenden Einstellungen auf FALSE festlegen:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Legacyverhalten
Beim Legacystandardverhalten hat replaceWhere
nur Daten überschrieben, die einem Prädikat über Partitionsspalten entsprachen. Mit diesem Legacymodell würde der folgende Befehl den Monat Januar in der Zieltabelle, die mit date
partitioniert wurde, atomisch durch die Daten in df
ersetzen:
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
Wenn Sie das alte Verhalten anwenden möchten, können Sie das Flag spark.databricks.delta.replaceWhere.dataColumns.enabled
deaktivieren:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
Dynamische Partitionsüberschreibungen
Wichtig
Dieses Feature befindet sich in der Public Preview.
Databricks Runtime 11.3 LTS und höher unterstützt den dynamischen Partitionsüberschreibmodus für partitionierte Tabellen. Für Tabellen mit mehreren Partitionen unterstützt Databricks Runtime 11.3 LTS und früher nur dynamische Partitionsüberschreibungen, wenn alle Partitionsspalten denselben Datentyp aufweisen.
Im Modus „dynamische Partitionsüberschreibung“ überschreiben Vorgänge alle vorhandenen Daten in jeder logischen Partition, für welche der Schreibvorgang neue Daten committet. Alle vorhandenen logischen Partitionen, für welche der Schreibvorgang keine Daten enthält, bleiben unverändert. Dieser Modus ist nur anwendbar, wenn die Daten im Überschreibmodus geschrieben werden: entweder INSERT OVERWRITE
in SQL oder ein Datenrahmenschreibvorgang mit df.write.mode("overwrite")
.
Konfigurieren Sie den dynamischen Partitionsüberschreibmodus, indem Sie die Spark-Sitzungskonfiguration spark.sql.sources.partitionOverwriteMode
auf dynamic
festlegen. Sie können dies auch aktivieren, indem Sie die DataFrameWriter
-Option partitionOverwriteMode
auf dynamic
festlegen. Wenn vorhanden, setzt die abfragespezifische Option den in der Sitzungskonfiguration definierten Modus außer Kraft. Der Standardwert für partitionOverwriteMode
lautet static
.
Wichtig
Überprüfen Sie, ob die Daten, die mit der dynamischen Partitionsüberschreibung geschrieben werden, nur die erwarteten Partitionen betreffen. Eine einzelne Zeile in der falschen Partition kann dazu führen, dass ungewollt eine ganze Partition überschrieben wird.
Im folgenden Beispiel wird die Verwendung dynamischer Partitionsüberschreibungen veranschaulicht:
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
Hinweis
- Das dynamische Überschreiben von Partitionen steht im Konflikt mit der Option
replaceWhere
für partitionierte Tabellen.- Wenn das dynamische Überschreiben von Partitionen in der Spark-Sitzungskonfiguration aktiviert ist und
replaceWhere
alsDataFrameWriter
-Option angegeben wird, überschreibt Delta Lake die Daten gemäß demreplaceWhere
-Ausdruck (abfragespezifische Optionen setzen Sitzungskonfigurationen außer Kraft). - Sie erhalten eine Fehlermeldung, wenn für die
DataFrameWriter
-Optionen sowohl die dynamische Partitionsüberschreibung als auchreplaceWhere
aktiviert sind.
- Wenn das dynamische Überschreiben von Partitionen in der Spark-Sitzungskonfiguration aktiviert ist und
- Sie können
overwriteSchema
nicht alstrue
angeben, wenn Sie die dynamische Partitionsüberschreibung verwenden.