Delta Lake を使用してデータを選択的に上書きする
Azure Databricks は Delta Lake 機能を利用して、選択的上書きの 2 つの異なるオプションをサポートします。
replaceWhere
オプションは、特定の述語に一致するすべてのレコードをアトミックに置き換えます。- 動的なパーティションの上書きを使用すると、テーブルのパーティション分割方法に基づいてデータのディレクトリを置き換えることができます。
Databricks では、ほとんどの操作で replaceWhere
を使用して、上書きするデータを指定することをお勧めします。
重要
データが誤って上書きされた場合は、復元を使用して変更を元に戻すことができます。
replaceWhere
を使用した任意の選択的上書き
任意の式に一致するデータだけを選択的に上書きすることができます。
Note
SQL には、Databricks Runtime 12.2 LTS 以降が必要です。
次のコマンドは、start_date
によってパーティション分割されているターゲット テーブルの 1 月のイベントを、replace_data
のデータにアトミックに置き換えます。
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
このサンプル コードでは、replace_data
にデータを書き出し、すべての行が述語に一致することを検証し、overwrite
セマンティクスを使用してアトミックな置換を実行します。 操作内の値が制約の範囲外にある場合、この操作は既定でエラーが発生して失敗します。
この動作を、述語範囲内の overwrite
値と、指定された範囲外の insert
レコードに変更できます。 これを行うには、次のいずれかの設定を使用して、spark.databricks.delta.replaceWhere.constraintCheck.enabled
を false に設定して制約チェックを無効にします。
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
従来の動作
従来の既定の動作では、replaceWhere
では、述語に一致するデータがパーティション列だけで上書きされていました。 このレガシ モデルでは、次のコマンドは、date
によってパーティション分割されているターゲット テーブルの 1 月の月を、df
のデータにアトミックに置き換えていました。
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
以前の動作に戻したい場合は、spark.databricks.delta.replaceWhere.dataColumns.enabled
フラグを無効にすることができます。
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
パーティションの動的な上書き
重要
この機能はパブリック プレビュー段階にあります。
Databricks Runtime 11.3 LTS 以降では、パーティション テーブルのパーティションの "動的な" 上書きモードがサポートされています。 複数のパーティションを持つテーブルの場合、Databricks Runtime 11.3 LTS 以下では、すべてのパーティション列が同じデータ型の場合にのみ、パーティションの動的な上書きがサポートされます。
パーティションの動的な上書きモードでは、書き込みが新しいデータをコミットする各論理パーティション内のすべての既存データが上書きされます。 書き込みにデータが含まれていない既存の論理パーティションは変更されません。 このモードは、データが上書きモードで書き込まれる SQL の INSERT OVERWRITE
または df.write.mode("overwrite")
の DataFrame 書き込みのいずれかの場合にのみ適用されます。
パーティションの動的な上書きモードを構成するには、Spark セッションを spark.sql.sources.partitionOverwriteMode
から dynamic
に構成します。 これは、DataFrameWriter
オプションの partitionOverwriteMode
を dynamic
に設定することによっても有効にできます。 存在する場合、セッション構成で定義されているモードは、クエリ固有のオプションによりオーバーライドされます。 partitionOverwriteMode
の規定値は static
です。
重要
パーティションの動的な上書きで、期待しているパーティションにのみデータが書き込まれれることを確認してください。 正しくないパーティション内の 1 つの行によって、意図せずにパーティション全体が上書きされる可能性があります。
パーティションの動的な上書きの使用例は次のとおりです。
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
注意
- パーティションの動的な上書きは、パーティション テーブルのオプション
replaceWhere
と競合します。- Spark セッション構成でパーティションの動的な上書きが有効になっていて、
replaceWhere
がDataFrameWriter
オプションとして指定されている場合、Delta Lake はreplaceWhere
式に従ってデータを上書きします (セッション構成はクエリ固有のオプションによりオーバーライドされます)。 DataFrameWriter
オプションで、パーティションの動的な上書きとreplaceWhere
が両方とも有効になっている場合、エラーが表示されます。
- Spark セッション構成でパーティションの動的な上書きが有効になっていて、
- パーティションの動的な上書きを使用する場合は、
overwriteSchema
をtrue
として指定することはできません。