Azure Databricks で Delta Lake 変更データ フィードを使用する

Azure Databricks で変更データ フィードを使うと、Delta テーブルのバージョン間で行レベルの変更を追跡できます。 Delta テーブルで有効になっていると、ランタイムによって、テーブルに書き込まれたすべてのデータの "変更イベント" が記録されます。 これには、行データと、指定された行が挿入、削除、または更新されたかどうかを示すメタデータが含まれます。

重要

変更データ フィードはテーブル履歴と連携して動作し、変更情報を提供します。 Delta テーブルを複製すると別の履歴が作成されるため、複製されたテーブルの変更データ フィードは元のテーブルの変更データ フィードと一致しません。

変更データを段階的に処理する

Databricks では、変更データ フィードを Structured Streaming と組み合わせて使用して、Delta テーブルからの変更を増分処理することをお勧めします。 テーブルの変更データ フィードのバージョンを自動的に追跡するには、Azure Databricks の Structured Streaming を使用する必要があります。

Note

Delta Live Tables は、変更データを簡単に伝達し、SCD (緩やかに変化するディメンション) タイプ 1 またはタイプ 2 のテーブルとして結果を保存するための機能を提供します。 APPLY CHANGES API: Delta Live Tables を使用した変更データ キャプチャの簡略化に関する記事を参照してください。

テーブルから変更データ フィードを読み取るために、そのテーブルで変更データ フィードを有効にする必要があります。 「データ フィードの変更を有効にする」を参照してください。

次の構文例に示すように、変更データ フィードを読み取るようにテーブルに対してストリームを構成する場合に、オプション readChangeFeedtrue に設定します。

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

既定では、ストリームは、ストリームが最初に開始されたときにテーブルの最新のスナップショットを INSERT として返し、今後の変更は変更データとして返します。

変更データは Delta Lake トランザクションの一部としてコミットされ、新しいデータがテーブルにコミットされるのと同時に利用可能になります。

必要に応じて、開始バージョンを指定できます。 「開始バージョンを指定する必要がありますか?」を参照してください。

変更データ フィードではバッチ実行もサポートされており、開始バージョンを指定する必要があります。 「バッチ クエリの変更を読み取る」を参照してください。

転送率の制限 (maxFilesPerTriggermaxBytesPerTrigger) や excludeRegex などのオプションは、変更データの読み取り時にもサポートされます。

レート制限は、スナップショットの開始バージョン以外のバージョンではアトミックにすることができます。 つまり、コミット バージョン全体に対してレート制限が適用されるか、コミット全体が返されます。

開始バージョンを指定する必要がありますか?

特定のバージョンより前に発生した変更を無視する場合は、必要に応じて開始バージョンを指定できます。 Delta トランザクション ログに記録されたタイムスタンプまたはバージョン ID 番号を使用して、バージョンを指定できます。

Note

バッチ読み取りには開始バージョンが必要であり、多くのバッチ パターンではオプションの終了バージョンを設定すると便利です。

変更データ フィードを含む Structured Streaming ワークロードを構成する場合は、開始バージョンの指定が処理にどのように影響するかを理解することが重要です。

多くのストリーミング ワークロード (特に新しいデータ処理パイプライン) は、既定の動作によるメリットがあります。 既定の動作では、最初のバッチは、最初にストリームがテーブル内のすべての既存のレコードを変更データ フィードの INSERT 操作として記録するときに処理されます。

ターゲット テーブルに、特定の時点までの適切な変更を含むすべてのレコードが既に含まれている場合は、ソース テーブルの状態を INSERT イベントとして処理しないように、開始バージョンを指定します。

次の例は、チェックポイントが破損したストリーミング障害から回復する構文です。 この例では、次のような条件を想定します。

  1. テーブル作成時にソース テーブルで変更データ フィードが有効になりました。
  2. ターゲット ダウンストリーム テーブルは、バージョン 75 以下のすべての変更を処理しました。
  3. ソース テーブルのバージョン履歴は、バージョン 70 以降で使用できます。

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

この例では、新しいチェックポイントの場所も指定する必要があります。

重要

開始バージョンを指定した場合、開始バージョンがテーブル履歴に存在しなくなると、ストリームは新しいチェックポイントから開始することができなくなります。 Delta Lake は履歴バージョンを自動的にクリーンアップします。つまり、指定されたすべての開始バージョンが最終的に削除されます。

変更データ フィードを使用してテーブルの履歴全体を再生できますか?」を参照してください。

バッチ クエリの変更を読み取る

バッチ クエリ構文を使用すると、特定のバージョンから始まるすべての変更を読み取ったり、指定したバージョン範囲内の変更を読み取ったりすることができます。

バージョンを整数として指定し、タイムスタンプを形式 yyyy-MM-dd[ HH:mm:ss[.SSS]] の文字列として指定します。

クエリには、開始および終了バージョンが含まれます。 特定の開始バージョンから最新バージョンのテーブルへの変更を読み取るには、開始バージョンのみを指定します。

変更イベントが記録されているバージョンより古いバージョンまたはタイムスタンプを指定した場合、つまり変更データ フィードが有効になっている場合は、変更データ フィードが有効になっていないことを示すエラーがスローされます。

次の構文例は、バッチ読み取りでバージョンの開始および終了バージョン オプションを使用する方法を示しています。

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Note

既定では、ユーザーがテーブルの最後のコミットを超えるバージョンまたはタイムスタンプを渡すと、エラー timestampGreaterThanLatestCommit がスローされます。 Databricks Runtime 11.3 LTS 以降では、ユーザーが次の構成を true に設定している場合、変更データ フィードは範囲外のバージョンのケースを処理できます。

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

テーブルの最後のコミットより新しい開始バージョン、またはテーブルの最後のコミットより新しい開始タイムスタンプを指定した場合、上記の構成が有効になっていると、空の読み取り結果が返されます。

テーブルの最後のコミットより新しい開始バージョン、またはテーブルの最後のコミットより新しい開始タイムスタンプを指定した場合、前述の構成がバッチ読み取りモードで有効になっている時、開始バージョンと最後のコミットの間のすべての変更が 返されます。

変更データ フィードのスキーマとは

テーブルの変更データ フィードから読み取る際には、最新のテーブル バージョンのスキーマが使用されます。

注意

スキーマの変更と進化のほとんどの操作は、完全にサポートされています。 列マッピングが有効になっているテーブルでは、一部のユース ケースがサポートされておらず、異なる動作が示されます。 「列マッピングが有効になっているテーブルの変更データ フィードの制限事項」を参照してください。

変更データ フィードには、Delta テーブルのスキーマのデータ列に加えて、変更イベントの種類を識別するメタデータ列が含まれています。

列名 Type
_change_type String insertupdate_preimageupdate_postimagedelete (1)
_commit_version long 変更を含むデルタ ログまたはテーブルのバージョン。
_commit_timestamp Timestamp コミットが作成されたときに関連付けられたタイムスタンプ。

(1) preimage は更新前の値であり、postimage は更新後の値です。

注意

追加された列と同じ名前の列がスキーマに含まれている場合、テーブルで変更データ フィードを有効にすることはできません。 変更データ フィードを有効にする前に、テーブル内の列の名前を変更してこの競合を解決してください。

変更データ フィードを有効にする

読み取ることができるのは、有効なテーブルの変更データ フィードのみです。 次のいずれかの方法を使用して、データ フィードの変更オプションを明示的に有効にする必要があります。

  • 新しいテーブル: テーブルのプロパティ delta.enableChangeDataFeed = true を、CREATE TABLE コマンドで設定します。

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 既存のテーブル: テーブルのプロパティ delta.enableChangeDataFeed = true を、ALTER TABLE コマンドで設定します。

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • すべての新しいテーブル:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

重要

変更データ フィードを有効にした後に加えられた変更のみが記録されます。 テーブルに対する過去の変更はキャプチャされません。

データ ストレージを変更する

変更データ フィードを有効にすると、テーブルのストレージ コストが少し増加します。 変更データ レコードはクエリの実行時に生成され、通常、書き換えられたファイルの合計サイズよりもはるかに小さくなります。

Azure Databricks によって、UPDATEDELETEMERGE 操作に対する変更データが、テーブル ディレクトリの下の _change_data フォルダーに記録されます。 挿入のみの操作やパーティションの完全な削除などの一部の操作では、Azure Databricks がトランザクション ログから直接変更データ フィードを効率的に計算できるため、_change_data ディレクトリにデータは生成されません。

_change_data フォルダー内のデータ ファイルに対するすべての読み取りは、サポートされている Delta Lake API を経由する必要があります。

_change_data フォルダー内のファイルは、テーブルの保持ポリシーに従います。 変更データ フィードのデータは、VACUUM コマンドの実行時に削除されます。

変更データ フィードを使用してテーブルの履歴全体を再生できますか?

変更データ フィードは、テーブルに対するすべての変更の永続的なレコードとして機能することは意図していません。 変更データ フィードは、有効になった後に発生した変更のみを記録します。

変更データ フィードと Delta Lake を使用すると、ソース テーブルの完全なスナップショットを常に再構築できます。つまり、変更データ フィードが有効になっているテーブルに対して新しいストリーミング読み取りを開始し、そのテーブルの現在のバージョンとその後に発生するすべての変更をキャプチャできます。

変更データフィードのレコードは一時的なものであり、指定された保存期間中しかアクセスできないものとして扱う必要があります。 Delta トランザクション ログは、テーブルのバージョンとそれに対応する変更データ フィードのバージョンを定期的に削除します。 トランザクション ログからバージョンが削除されると、そのバージョンの変更データ フィードを読み取れなくなります。

ユース ケースでテーブルに対するすべての変更の永続的な履歴を保持する必要がある場合は、増分ロジックを使用して変更データ フィードから新しいテーブルにレコードを書き込む必要があります。 次のコード例では、Structured Streaming の増分処理を活用しながら、使用可能なデータをバッチ ワークロードとして処理する trigger.AvailableNow の使用を示します。 このワークロードをメインの処理パイプラインで非同期的にスケジュールして、監査または完全な再生のために変更データ フィードのバックアップを作成できます。

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

列マッピングが有効になっているテーブルの変更データ フィードの制限事項

Delta テーブルで列マッピングが有効になっていると、既存データのデータ ファイルを書き換えることなく、テーブル内の列を削除または名前変更できます。 列マッピングが有効な場合、列の名前変更や削除、データ型の変更、NULL 値の許容の変更といった追加処理のないスキーマ変更を行うと、変更データ フィードに制限事項が生じます。

重要

  • バッチセマンティクスを使用して追加処理のないスキーマ変更が発生したトランザクションまたは範囲の変更データ フィードを読み取ることはできません。
  • Databricks Runtime 12.2 LTS 以前では、列マッピングが有効になっているテーブルで追加処理のないスキーマ変更が発生した場合、変更データ フィードに対するストリーム読み取りはサポートされません。 「列マッピングとスキーマの変更が伴うストリーミング」を参照してください。
  • Databricks Runtime 11.3 LTS 以前では、列マッピングが有効になっているテーブルで列の名前変更または削除が発生した場合、そのテーブルの変更データ フィードを読み取ることはできません。

Databricks Runtime 12.2 LTS 以降では、列マッピングが有効になっているテーブルで追加処理のないスキーマ変更が発生した場合、そのテーブルの変更データ フィードに対してバッチ読み取りを実行できます。 読み取り操作では、テーブルの最新バージョンのスキーマを使用する代わりに、クエリで指定されたテーブルの最終バージョンのスキーマが使用されます。 指定されたバージョン範囲に追加処理のないスキーマ変更が含まれる場合、クエリは失敗します。