Selektives Überschreiben von Daten mit Delta Lake

Azure Databricks nutzt die Delta Lake-Funktionalität, um zwei verschiedene Optionen für selektive Überschreibungen zu unterstützen:

  • Die Option replaceWhere ersetzt automatisch alle Datensätze, die einem bestimmten Prädikat entsprechen.
  • Sie können Datenverzeichnisse mithilfe dynamischer Partitionsüberschreibungen basierend darauf ersetzen, wie Tabellen partitioniert sind.

Für die meisten Vorgänge empfiehlt Databricks die Verwendung von replaceWhere, um anzugeben, welche Daten überschrieben werden sollen.

Wichtig

Wenn Daten versehentlich überschrieben wurden, können Sie restore verwenden, um die Änderung rückgängig zu machen.

Beliebiges selektives Überschreiben mit replaceWhere

Sie können selektiv nur die Daten überschreiben, die einem beliebigen Ausdruck entsprechen.

Hinweis

SQL erfordert Databricks Runtime 12.2 LTS oder höher.

Mit dem folgenden Befehl werden Ereignisse im Januar in der nach start_date partitionierten Zieltabelle atomisch durch die Daten in replace_data ersetzt:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Dieser Beispielcode schreibt die Daten aus replace_data, überprüft, ob alle Zeilen mit dem Prädikat übereinstimmen, und führt mithilfe der overwrite-Semantik eine atomische Ersetzung durch. Wenn Werte im Vorgang außerhalb der Einschränkung liegen, tritt standardmäßig ein Fehler auf.

Sie können dieses Verhalten zu overwrite-Werten innerhalb des Prädikatbereichs und zu insert-Datensätzen ändern, die außerhalb des angegebenen Bereichs liegen. Deaktivieren Sie dazu die Überprüfung von Einschränkungen, indem Sie spark.databricks.delta.replaceWhere.constraintCheck.enabled mithilfe einer der folgenden Einstellungen auf FALSE festlegen:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Legacyverhalten

Beim Legacystandardverhalten hat replaceWhere nur Daten überschrieben, die einem Prädikat über Partitionsspalten entsprachen. Mit diesem Legacymodell würde der folgende Befehl den Monat Januar in der Zieltabelle, die mit date partitioniert wurde, atomisch durch die Daten in df ersetzen:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

Wenn Sie das alte Verhalten anwenden möchten, können Sie das Flag spark.databricks.delta.replaceWhere.dataColumns.enabled deaktivieren:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Dynamische Partitionsüberschreibungen

Wichtig

Dieses Feature befindet sich in der Public Preview.

Databricks Runtime 11.3 LTS und höher unterstützt den dynamischen Partitionsüberschreibmodus für partitionierte Tabellen. Für Tabellen mit mehreren Partitionen unterstützt Databricks Runtime 11.3 LTS und früher nur dynamische Partitionsüberschreibungen, wenn alle Partitionsspalten denselben Datentyp aufweisen.

Im Modus „dynamische Partitionsüberschreibung“ überschreiben Vorgänge alle vorhandenen Daten in jeder logischen Partition, für welche der Schreibvorgang neue Daten committet. Alle vorhandenen logischen Partitionen, für welche der Schreibvorgang keine Daten enthält, bleiben unverändert. Dieser Modus ist nur anwendbar, wenn die Daten im Überschreibmodus geschrieben werden: entweder INSERT OVERWRITE in SQL oder ein Datenrahmenschreibvorgang mit df.write.mode("overwrite").

Konfigurieren Sie den dynamischen Partitionsüberschreibmodus, indem Sie die Spark-Sitzungskonfiguration spark.sql.sources.partitionOverwriteMode auf dynamic festlegen. Sie können dies auch aktivieren, indem Sie die DataFrameWriter-Option partitionOverwriteMode auf dynamic festlegen. Wenn vorhanden, setzt die abfragespezifische Option den in der Sitzungskonfiguration definierten Modus außer Kraft. Der Standardwert für partitionOverwriteMode lautet static.

Wichtig

Überprüfen Sie, ob die Daten, die mit der dynamischen Partitionsüberschreibung geschrieben werden, nur die erwarteten Partitionen betreffen. Eine einzelne Zeile in der falschen Partition kann dazu führen, dass ungewollt eine ganze Partition überschrieben wird.

Im folgenden Beispiel wird die Verwendung dynamischer Partitionsüberschreibungen veranschaulicht:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Hinweis

  • Das dynamische Überschreiben von Partitionen steht im Konflikt mit der Option replaceWhere für partitionierte Tabellen.
    • Wenn das dynamische Überschreiben von Partitionen in der Spark-Sitzungskonfiguration aktiviert ist und replaceWhere als DataFrameWriter-Option angegeben wird, überschreibt Delta Lake die Daten gemäß dem replaceWhere-Ausdruck (abfragespezifische Optionen setzen Sitzungskonfigurationen außer Kraft).
    • Sie erhalten eine Fehlermeldung, wenn für die DataFrameWriter-Optionen sowohl die dynamische Partitionsüberschreibung als auch replaceWhere aktiviert sind.
  • Sie können overwriteSchema nicht als true angeben, wenn Sie die dynamische Partitionsüberschreibung verwenden.