APPLY CHANGES API: Delta Live Tables を使用した変更データ キャプチャの簡略化

Delta Live Tables で APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT API を使うと、変更データ キャプチャ (CDC) が簡単になります。 使用するインターフェイスは、変更データのソースによって異なります。

  • 変更データ フィード (CDF) からの変更を処理するには、APPLY CHANGES を使用します。
  • データベース スナップショットの変更を処理するには、APPLY CHANGES FROM SNAPSHOT (パブリック プレビュー) を使用します。

これまで、Azure Databricks で CDC レコードを処理するには、MERGE INTO ステートメントが一般に使われていました。 しかし、MERGE INTO では、順序が正しくないレコードのために誤った結果が生成される場合や、レコードを並べ替えるために複雑なロジックが必要になる場合があります。

APPLY CHANGES API は、Delta Live Tables SQL および Python インターフェイスでサポートされています。 APPLY CHANGES FROM SNAPSHOT API は、Delta Live Tables Python インターフェイスでサポートされています。

APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT の両方で、SCD タイプ 1 とタイプ 2 を使用したテーブルの更新がサポートされます。

  • レコードを直接更新するには、SCD タイプ 1 を使用します。 更新されたレコードの履歴は保持されません。
  • SCD タイプ 2 を使用して、すべての更新または指定された列のセットの更新時にレコードの履歴を保持します。

構文とその他の参考資料については、以下を参照してください。

Note

この記事では、ソース データの変更に基づいて Delta Live Tables パイプラインのテーブルを更新する方法について説明します。 Delta テーブルの行レベルの変更情報を記録およびクエリする方法については、「Azure Databricks で Delta Lake 変更データ フィードを使用する」をご覧ください。

要件

CDC API を使用するには、サーバーレス DLT パイプラインあるいは Delta Live Tables の Pro または Advanced エディションを使用するようにパイプラインを構成する必要があります。

APPLY CHANGES API ではどのように CDC が実装されますか?

Delta Live Tables の APPLY CHANGES API を使うと、正しくない順序のレコードが自動的に処理されるので、CDC レコードの正しい処理が保証され、正しくない順序のレコードを処理するために複雑なロジックを開発する必要がなくなります。 レコードのシーケンス処理に使用するソース データ内の列を指定する必要があります。Delta Live Tables は、ソース データの適切な順序付けを単調増加する表現として解釈します。 Delta Live Tables は、順不同で到着したデータを自動的に処理します。 SCD タイプ 2 の変更の場合、Delta Live Tables では、ターゲット テーブルの __START_AT および __END_AT 列に適切なシーケンス値が反映されます。 各シーケンス値ではキーごとに 1 つの個別の更新が必要であり、NULL シーケンス値はサポートされていません。

APPLY CHANGES で CDC 処理を実行するには、まずストリーミング テーブルを作成してから、SQL の APPLY CHANGES INTO ステートメントまたは Python の apply_changes() 関数を使用して、変更フィードのソース、キー、シーケンスを指定します。 ターゲット ストリーミング テーブルを作成するには、SQL の CREATE OR REFRESH STREAMING TABLE ステートメントまたは Python の create_streaming_table() 関数を使用します。 SCD タイプ 1 とタイプ 2 の処理の例を参照してください。

構文の詳細については、Delta Live Tables の「SQL リファレンス」または「Python リファレンス」を参照してください。

APPLY CHANGES FROM SNAPSHOT API ではどのように CDC が実装されますか?

重要

APPLY CHANGES FROM SNAPSHOT API はパブリック プレビュー段階です。

APPLY CHANGES FROM SNAPSHOT は、一連の順番どおりのスナップショットを比較してソース データの変更を効率的に判断し、スナップショット内のレコードの CDC 処理に必要な処理を実行する宣言型 API です。 APPLY CHANGES FROM SNAPSHOT は、Delta Live Tables Python インターフェイスでのみサポートされます。

APPLY CHANGES FROM SNAPSHOT では、複数のソースの種類からのスナップショットの取り込みがサポートされています。

  • 定期的なスナップショット インジェストを使用して、既存のテーブルまたはビューからスナップショットを取り込みます。 APPLY CHANGES FROM SNAPSHOT には、既存のデータベース オブジェクトからのスナップショットの定期的な取り込みをサポートするシンプルで合理化されたインターフェイスがあります。 パイプラインの更新ごとに新しいスナップショットが取り込まれ、取り込み時間がスナップショット バージョンとして使用されます。 パイプラインを連続モードで実行すると、APPLY CHANGES FROM SNAPSHOT 処理を含むフローのトリガー間隔設定によって決定された期間に、パイプラインの更新ごとに複数のスナップショットが取り込まれます。
  • 履歴スナップショット インジェストを使用して、Oracle または MySQL データベースあるいはデータ ウェアハウスから生成されたスナップショットなど、データベース スナップショットを含むファイルを処理します。

APPLY CHANGES FROM SNAPSHOT を使って任意のソースの種類から CDC 処理を実行するには、まずストリーミング テーブルを作成してから、Python で apply_changes_from_snapshot() 関数を使用して、処理を実装するために必要なスナップショット、キー、およびその他の引数を指定します。 定期的なスナップショット インジェスト履歴スナップショット インジェストの例を参照してください。

API に渡されるスナップショットは、バージョン別の昇順である必要があります。 Delta Live Tables で順序が正しくないスナップショットが検出されると、エラーがスローされます。

構文の詳細については、Delta Live Tables の「Python リファレンス」を参照してください。

制限事項

APPLY CHANGESAPPLY CHANGES FROM SNAPSHOT クエリのターゲットは、ストリーミング テーブルのソースとして使用できません。 APPLY CHANGES または APPLY CHANGES FROM SNAPSHOT クエリのターゲットから読み取るテーブルは、具体化されたビューである必要があります。

例: CDF ソース データを使用した SCD タイプ 1 と SCD タイプ 2 の処理

次のセクションでは、以下を行う変更データ フィードからのソース イベントに基づいてターゲット テーブルを更新する Delta Live Tables SCD タイプ 1 とタイプ 2 のクエリの例を示します。

  1. 新しいユーザー レコードを作成します。
  2. ユーザー レコードを削除します。
  3. ユーザー レコードを更新します。 SCD タイプ 1 の例では、最後の UPDATE 操作が遅れて到着し、ターゲット テーブルからドロップされます。これは順不同のイベントの処理を示しています。

次の例は、Delta Live Tables パイプラインの構成と更新に精通していることを前提としています。 「チュートリアル: 最初の Delta Live Tables パイプラインを実行する」を参照してください。

これらの例を実行するには、サンプル データセットを作成することから始める必要があります。 「テスト データを生成する」を参照してください。

これらの例の入力レコードは以下の通りです。

userId name city operation sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

サンプル データの最後の行のコメントを解除すると、レコードを切り捨てる場所を指定する次のレコードが挿入されます。

userId name city operation sequenceNum
null null null TRUNCATE 3

Note

次のすべての例には、DELETETRUNCATE の両方の操作を指定するオプションが含まれていますが、それぞれ省略可能です。

SCD タイプ 1 の更新を処理する

次の例は、SCD タイプ 1 の更新の処理を示しています。

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

SCD タイプ 1 の例でクエリを実行した後、ターゲット テーブルには次のレコードが含まれます。

userId name city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancun

追加の TRUNCATE レコードで SCD タイプ 1 の例を実行すると、124 および 126sequenceNum=3 での TRUNCATE 操作のためにレコードが切り捨てられ、ターゲット テーブルに次のレコードが含まれます。

userId name city
125 Mercedes Guadalajara

SCD タイプ 2 の更新を処理する

次の例は、SCD タイプ 2 の更新の処理を示しています。

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

SCD タイプ 2 の例でクエリを実行した後、ターゲット テーブルには次のレコードが含まれます。

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lily Cancun 2 null

SCD タイプ 2 クエリでは、ターゲット テーブル内の履歴に対して、追跡する出力列のサブセットを指定することもできます。 他の列への変更は、新しい履歴レコードを生成するのではなく、上書きでレコードが更新されます。 次の例では、city 列の追跡からの除外を示します。

次の例は、SCD タイプ 2 で履歴追跡を使用する方法を示しています。

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

この例を追加の TRUNCATE レコードなしで実行した後、そのターゲット テーブルには次のレコードが含まれます。

userId name city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 null
125 Mercedes Guadalajara 2 null
126 Lily Cancun 2 null

テスト データを生成する

次のコードは、このチュートリアルにあるサンプル クエリで使用するサンプル データセットを生成する目的で提供されています。 新しいスキーマを作成し、新しいテーブルを作成するための適切な資格情報を持っていれば、ノートブックまたは Databricks SQL を使用して、これらのステートメントを実行できます。 次のコードは、Delta Live Tables パイプラインの一部として実行することを意図したものではありません。

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

例: 定期的なスナップショットの処理

次の例は、mycatalog.myschema.mytable に格納されているテーブルのスナップショットを取り込む SCD タイプ 2 の処理を示しています。 処理の結果は、target という名前のテーブルに書き込まれます。

タイムスタンプ 2024-01-01 00:00:00 の mycatalog.myschema.mytable レコード

Key
1 a1
2 a2

タイムスタンプ 2024-01-01 12:00:00 の mycatalog.myschema.mytable レコード

Key Value
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

スナップショットの処理後、ターゲット テーブルには次のレコードが含まれます。

Key Value __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 null
3 a3 2024-01-01 12:00:00 null

例: 履歴スナップショットの処理

次の例は、クラウド ストレージ システムに格納されている 2 つのスナップショットからのソース イベントに基づいてターゲット テーブルを更新する SCD タイプ 2 の処理を示しています。

/<PATH>/filename1.csv に格納されている timestamp のスナップショット

キー TrackingColumn NonTrackingColumn
1 a1 b1
2 a2 b2
4 a4 b4

/<PATH>/filename2.csv に格納されている timestamp + 5 のスナップショット

キー TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

次のコード例は、これらのスナップショットを使用した SCD タイプ 2 の更新の処理を示しています。

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

スナップショットの処理後、ターゲット テーブルには次のレコードが含まれます。

キー TrackingColumn NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 null
3 a3 b3 2 null
4 a4 b4_new 1 null

ターゲット ストリーミング テーブルのデータを追加、変更、または削除する

パイプラインがテーブルを Unity Catalog に発行する場合は、挿入ステートメント、更新ステートメント、削除ステートメント、マージ ステートメントなどのデータ操作言語 (DML) ステートメントを使用して、APPLY CHANGES INTO ステートメントによって作成されたターゲット ストリーミング テーブルを変更することができます。

Note

  • ストリーミング テーブルのテーブル スキーマを変更する DML ステートメントはサポートされていません。 DML ステートメントがテーブル スキーマの進化を試みないことを確認します。
  • ストリーミング テーブルを更新する DML ステートメントは、Databricks Runtime 13.3 LTS 以降を使用する共有 Unity Catalog クラスターまたは SQL ウェアハウスでのみ実行できます。
  • ストリーミングには追加専用のデータソースが必要なため、処理で (DML ステートメントなどによる) 変更を伴うソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルを読み取るときに skipChangeCommits フラグを設定します。 skipChangeCommits が設定されていれば、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。 処理でストリーミング テーブルが必要ない場合は、(追加専用の制限がない) 具体化されたビューをターゲット テーブルとして使用できます。

Delta Live Tables は指定された SEQUENCE BY 列を使用し、ターゲット テーブルの __START_AT 列と __END_AT 列に適切なシーケンス値を反映させるため (SCD タイプ 2 の場合)、DML ステートメントがこれらの列に有効な値を使用して、レコードの適切な順序付けを維持する必要があります。 「APPLY CHANGES API ではどのように CDC が実装されますか?」を参照してください。

ストリーミング テーブルで DML ステートメントを使用する方法の詳細については、「ストリーミング テーブルのデータを追加、変更、または削除する」を参照してください。

次の例では、開始シーケンスが 5 のアクティブ なレコードを挿入します。

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Delta Live Tables CDC クエリで処理されたレコードに関するデータを取得する

Note

次のメトリックは、APPLY CHANGES FROM SNAPSHOT クエリではなく、APPLY CHANGES クエリによってのみキャプチャされます。

次のメトリックは APPLY CHANGES クエリによってキャプチャされます。

  • num_upserted_rows: 更新中にそのデータセットにアップサートされた出力行の数。
  • num_deleted_rows: 更新中にそのデータセットから削除された既存の出力行の数。

非 CDC フローの出力である num_output_rows メトリックは、apply changes のクエリではキャプチャされません。

Delta Live Tables の CDC 処理には、どのようなデータ オブジェクトが使用されますか?

注: 次のデータ構造は、APPLY CHANGES FROM SNAPSHOT 処理ではなく、APPLY CHANGES 処理にのみ適用されます。

Hive メタストアでターゲット テーブルを宣言すると、次の 2 つのデータ構造が作成されます。

  • ターゲット テーブルに割り当てられた名前を使用するビュー。
  • Delta Live Tables が CDC 処理の管理に使用する内部バッキング テーブル。 このテーブルには、ターゲット テーブル名の前に __apply_changes_storage_ を付加した名前が付けられます。

たとえば、dlt_cdc_target という名前のターゲット テーブルを宣言すると、メタストア内に dlt_cdc_target という名前のビューと __apply_changes_storage_dlt_cdc_target という名前のテーブルが表示されます。 ビューを作成すると、Delta Live Tables は、順不同のデータの処理に必要な追加情報 (廃棄標識やバージョンなど) を除外できます。 処理されたデータを表示するには、ターゲット ビューに対してクエリを実行します。 __apply_changes_storage_ テーブルのスキーマは、将来の機能や機能強化をサポートするために変更される可能性があるため、運用環境向けのテーブルに対してクエリを実行しないでください。 テーブルにデータを手動で追加すると、バージョン列がないため、レコードは他の変更の前にあると見なされます。

パイプラインで Unity Catalog に発行する場合、ユーザーは内部バッキング テーブルにアクセスできません。