Delta Lake のテーブル スキーマを更新する

Delta Lake を使用すると、テーブルのスキーマを更新できます。 サポートされている変更の種類は次のとおりです。

  • 新しい列を追加する (任意の位置で)
  • 既存の列を並べ替える
  • 既存の列の名前を変更する

これらの変更は、DDL を使用して明示的に行うか、DML を使用して暗黙的に行うことができます。

重要

Delta テーブル スキーマの更新は、すべての同時差分書き込み操作と競合する操作です。

Delta テーブル スキーマを更新すると、そのテーブルから読み取られたストリームが終了します。 ストリームを継続したい場合は、再起動する必要があります。 推奨される方法については、「Azure Databricks での構造化ストリーミング アプリケーションの運用に関する考慮事項」を参照してください。

スキーマを明示的に更新して列を追加する

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

既定では、null 値の許容は true です。

入れ子になったフィールドに列を追加するには、次のように入力します。

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

たとえば、ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) を実行する前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

後のスキーマは次のようになります。

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Note

入れ子になった列の追加がサポートされているのは構造体の場合だけです。 配列とマップはサポートされていません。

スキーマを明示的に更新して列のコメントまたは順序を変更する

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

入れ子になったフィールドの列を変更するには、次のように入力します。

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

たとえば、ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST を実行する前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

後のスキーマは次のようになります。

- root
| - colA
| - colB
| +-field2
| +-field1

スキーマを明示的に更新して列を置き換える

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

たとえば、次の DDL を実行する場合:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

後のスキーマは次のようになります。

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

スキーマを明示的に更新して列の名前を変更する

Note

この機能は、Databricks Runtime 10.4 LTS 以降で使用できます。

列の既存データを書き直すことなく列の名前を変更するには、テーブルの列マッピングを有効にする必要があります。 「Delta Lake の列マッピングを使用して列の名前変更と削除を行う」をご覧ください。

列名を変更するには:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

入れ子になったフィールドの名前を変更するには:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

たとえば、次のコマンドを実行する場合:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

後のスキーマは次のようになります。

- root
| - colA
| - colB
| +-field001
| +-field2

Delta Lake の列マッピングを使用して列の名前変更と削除を行う」をご覧ください。

スキーマを明示的に更新して列を削除する

Note

この機能は、Databricks Runtime 11.3 LTS 以降で使用できます。

データ ファイルを書き換えずに列をメタデータのみの操作として削除するには、テーブルの列マッピングを有効にする必要があります。 「Delta Lake の列マッピングを使用して列の名前変更と削除を行う」をご覧ください。

重要

メタデータから列を削除しても、ファイル内の列の基になるデータは削除されません。 削除された列データを消去するには、REORG TABLE を使用してファイルを書き換えます。 その後、VACUUM を使用して、削除された列データを含むファイルを物理的に削除できます。

列を削除するには

ALTER TABLE table_name DROP COLUMN col_name

複数の列を削除するには

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

スキーマを明示的に更新して列の種類または名前を変更する

テーブルを書き直すことで、列の型または名前を変更することも、列を破棄することもできます。 これを行うには、overwriteSchema オプションを使用します。

次の例では、列の種類の変更を示しています。

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

次の例では、列の名前の変更を示しています。

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

スキーマの展開を有効にする

スキーマの展開を有効にするには、次のいずれかの操作を行います。

Databricks では、Spark 構成を設定するのではなく、書き込み操作ごとにスキーマの展開を有効にすることをお勧めします。

書き込み操作でスキーマの展開を有効にするためにオプションまたは構文を使用する場合、これは Spark 構成よりも優先されます。

Note

INSERT INTO ステートメントのスキーマ展開句はありません。

新しい列を追加する書き込みについては、スキーマの展開を有効にする

ソース クエリに存在するがターゲット テーブルにない列は、スキーマの展開を有効にした場合に、書き込みトランザクションの一部として自動的に追加されます。 スキーマ展開を有効にする

新しい列を追加するとき、大文字と小文字が維持されます。 新しい列はテーブル スキーマの末尾に追加されます。 追加の列が構造体内にある場合は、ターゲット テーブルの構造体の末尾に追加されます。

次の例では、自動ローダーで mergeSchema オプションを使用する方法を示します。 「自動ローダー」を参照してください。

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

次の例では、バッチ書き込み操作で mergeSchema オプションを使用する方法を示します。

(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)

Delta Lake マージの自動スキーマの展開

スキーマの展開により、ユーザーはマージ時のターゲット テーブルとソース テーブルのスキーマの不一致を解決できます。 これにより、次の 2 つのケースが処理されます。

  1. ソース テーブルの列がターゲット テーブルに存在しません。 新しい列がターゲット スキーマに追加され、その値がソース値を使用して挿入または更新されます。
  2. ターゲット テーブルの列がソース テーブルに存在しません。 ターゲット スキーマは変更されません。追加のターゲット列の値は、UPDATE の場合は変更されず、INSERT の場合は NULL に設定されます。

スキーマの自動展開を手動で有効にする必要があります。 「スキーマの展開を有効にする」を参照してください。

Note

Databricks Runtime 12.2 LTS 以降では、ソース テーブルに存在する列と構造体フィールドは、挿入または更新の各アクションで、名前で指定できます。 Databricks Runtime 11.3 LTS 以下では、マージを使用したスキーマの展開には、INSERT * または UPDATE SET * アクションのみを使用できます。

Databricks Runtime 13.3 LTS 以降では、map<int, struct<a: int, b: int>> などのマップ内に入れ子になった構造体でスキーマの展開を使用できます。

マージ用のスキーマ展開構文

Databricks Runtime 15.2 以降では、SQL あるいは Delta Table API を使用して、マージ ステートメントでスキーマの展開を指定できます。

SQL

MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Python

from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

スキーマの展開によるマージの操作例

スキーマの展開がある場合とない場合の merge 操作の効果を、次の例に示します。

[列] クエリ (SQL の場合) スキーマ展開のない動作 (既定) スキーマ展開のある動作
ターゲット列: key, value

ソース列: key, value, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
テーブル スキーマは変更されません。列 keyvalue のみが更新または挿入されます。 テーブル スキーマが (key, value, new_value) に変更されます。 一致する既存のレコードは、ソースの valuenew_value を使用して更新されます。 スキーマ (key, value, new_value) と共に新しい行が挿入されます。
ターゲット列: key, old_value

ソース列: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
UPDATEINSERT アクションは、ターゲット列 old_value がソース内にないため、エラーをスローします。 テーブル スキーマが (key, old_value, new_value) に変更されます。 一致する既存のレコードは、old_value は変更されずに、ソースの new_value を使用して更新されます。 新しいレコードは、指定した keynew_value (old_value には NULL) を使用して挿入されます。
ターゲット列: key, old_value

ソース列: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
UPDATE は、ターゲット テーブルに列 new_value が存在しないため、エラーをスローします。 テーブル スキーマが (key, old_value, new_value) に変更されます。 一致する既存のレコードは、old_value は変更されずに、ソースの new_value を使用して更新されます。不一致のレコードでは、new_valueNULL が入力されます。 注 (1) を参照。
ターゲット列: key, old_value

ソース列: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
INSERT は、ターゲット テーブルに列 new_value が存在しないため、エラーをスローします。 テーブル スキーマが (key, old_value, new_value) に変更されます。 新しいレコードは、指定した keynew_value (old_value には NULL) を使用して挿入されます。 既存のレコードでは、old_value は変更されず、new_valueNULL が入力されます。 注 (1) を参照。

(1) この動作は、Databricks Runtime 12.2 LTS 以降で使用できます。Databricks Runtime 11.3 LTS 以下では、この条件はエラーになります。

Delta Lake マージを使用して列を除外する

Databricks Runtime 12.2 LTS 以降では、マージ条件で EXCEPT 句を使用して列を明示的に除外できます。 EXCEPT キーワードの動作は、スキーマの展開が有効になっているかどうかによって異なります。

スキーマの展開を無効にすると、EXCEPT キーワードがターゲット テーブル内の列のリストに適用され、UPDATE アクションまたは INSERT アクションから列を除外できます。 除外された列は null に設定されます。

スキーマの展開を有効にすると、EXCEPT キーワードがソース テーブル内の列のリストに適用され、スキーマの展開から列を除外できます。 ターゲット内に存在しないソース内の新しい列は、EXCEPT 句にリストされている場合、ターゲット スキーマに追加されません。 ターゲットに既に存在する除外された列は、null に設定されます。

この構文の例を次に示します。

クエリ (SQL の場合) スキーマ展開のない動作 (既定) スキーマ展開のある動作
ターゲット列: id, title, last_updated

ソース列: id, title, review, last_updated
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
一致した行は、last_updated フィールドを現在の日付に設定して更新されます。 idtitle の値を使用して、新しい行が挿入されます。 除外されたフィールド last_updatednull に設定されます。 フィールド review はターゲット内にないため無視されます。 一致した行は、last_updated フィールドを現在の日付に設定して更新されます。 スキーマが展開され、フィールド review が追加されます。 null に設定されている last_updated を除き、すべてのソース フィールドを使用して新しい行が挿入されます。
ターゲット列: id, title, last_updated

ソース列: id, title, review, internal_count
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
INSERT は、ターゲット テーブルに列 internal_count が存在しないため、エラーをスローします。 一致した行は、last_updated フィールドを現在の日付に設定して更新されます。 review フィールドはターゲット テーブルに追加されますが、internal_count フィールドは無視されます。 新しく挿入された行の last_updatednull に設定されています。

スキーマ更新での NullType 列の処理

Parquet では NullType がサポートされていないため、Delta テーブルへの書き込み時に DataFrame から NullType 列が破棄されますが、スキーマにはまだ格納されています。 その列に対して別のデータ型を受け取ったとき、Delta Lake はスキーマを新しいデータ型にマージします。 Delta Lake が既存の列に対して NullType を受け取ると、古いスキーマは保持され、新しい列は書き込み中に破棄されます。

ストリーミングにおける NullType がサポートされていません。 ストリーミングを使用するときはスキーマを設定する必要があるため、これは非常にまれです。 NullTypeArrayTypeMapType などの複合型でも受け入れられません。

テーブル スキーマの置換

既定では、テーブル内のデータを上書きしてもスキーマは上書きされません。 replaceWhere のない mode("overwrite") を使用して テーブルを上書きするとき、書き込まれているデータのスキーマを上書きすることもできます。 テーブルのスキーマとパーティション分割を置き換えるには、overwriteSchema オプションを true に設定します。

df.write.option("overwriteSchema", "true")

重要

動的パーティションの上書きを使用する場合は、overwriteSchematrue として指定することはできません。