Azure Databricks でテーブルを複製する

clone コマンドを使用して、特定のバージョンの既存 Delta Lake テーブルのコピーを Azure Databricks に作成できます。 複製は、ディープまたはシャローとすることができます。

Azure Databricks では、Parquet テーブルと Iceberg テーブルの複製もサポートされています。 「Parquet テーブルと Iceberg テーブルを Delta Lake に増分複製する」を参照してください。

Unity Catalog でクローンを使用する方法の詳細については、「Unity Catalog テーブルのシャロー クローン」を参照してください。

Note

Databricks では、Delta Sharing を使って、さまざまな組織のテーブルへの読み取り専用アクセスを提供することが推奨されています。 「Delta Sharing とは」を参照してください。

複製の種類

  • "ディープ複製" は、既存テーブルのメタデータに加えて、ソース テーブルのデータを複製先にコピーする複製です。 さらに、ストリーム メタデータも複製されるため、Delta テーブルに書き込むストリームをソース テーブルで停止し、中断した場所から複製先で続行することができます。
  • "シャロー複製" は、データ ファイルが複製先にコピーされない複製です。 テーブルのメタデータは、ソースと同じです。 これらの複製の方が、作成するコストは安くなります。

複製されるメタデータには、スキーマ、パーティション分割情報、インバリアント、NULL 値の許容が含まれます。 ディープ複製の場合のみ、ストリームと COPY INTO のメタデータも複製されます。 複製されないメタデータは、テーブルの説明とユーザー定義のコミット メタデータです。

差分クローン操作のセマンティクスとは

Hive メタストアに登録されている差分テーブル、またはテーブルとして登録されていないファイルのコレクションを操作している場合、クローンには次のセマンティクスがあります。

重要

Databricks Runtime 13.3 LTS 以降では、Unity Catalog マネージド テーブルでシャロー クローンがサポートされます。 Unity Catalog テーブルのクローン セマンティクスは、他の環境の Delta Lake クローン セマンティクスと大きく異なります。 「Unity Catalog テーブルのシャロー クローン」を参照してください。

  • ディープまたはシャロー複製に加えられたすべての変更は、ソース テーブルではなく複製自体にのみ影響します。
  • シャロー複製は、ソース ディレクトリ内のデータ ファイルを参照します。 ソース テーブルで vacuum を実行すると、クライアントは参照されているデータ ファイルを読み取ることができなくなり、FileNotFoundException がスローされます。 この場合、シャロー複製に対する置換を指定して複製を実行すると、複製が修復されます。 これが頻繁に発生する場合は、代わりに、ソース テーブルに依存しないディープ複製の使用を検討してください。
  • ディープ複製は、複製元のソースに依存しませんが、ディープ複製はデータとメタデータをコピーするため、作成にコストがかかります。
  • replace を使用してターゲットに複製し、そのパスにテーブルが既に存在しているとき、そのパスに Delta ログが存在しない場合は作成されます。 vacuum を実行することで、既存のデータをクリーンアップできます。
  • 既存の Delta テーブルの場合は、ソース テーブルから新しいメタデータと新しいデータを含む新しいコミットが作成されます。 この新しいコミットは増分です。つまり、最後の複製以降に加えられた新しい変更だけがテーブルにコミットされます。
  • テーブルを複製することは、Create Table As Select または CTAS と同じではありません。 複製によって、ソース テーブルのデータに加えてメタデータもコピーされます。 複製は構文が簡単でもあります。パーティション分割、形式、インバリアント、NULL 値の許容などはソース テーブルから取得されるため、指定する必要がありません。
  • 複製されたテーブルには、ソース テーブルとは独立した履歴があります。 複製されたテーブルに対するタイム トラベル クエリは、ソース テーブルに対して機能するのと同じ入力では、機能しません。

クローン構文の例

次のコード例は、ディープ クローンとシャロー クローンを作成するための構文を示しています。

SQL

CREATE TABLE target_table CLONE source_table; -- Create a deep clone of source_table as target_table

CREATE OR REPLACE TABLE target_table CLONE source_table; -- Replace the target

CREATE TABLE IF NOT EXISTS target_table CLONE source_table; -- No-op if the target table exists

CREATE TABLE target_table SHALLOW CLONE source_table;

CREATE TABLE target_table SHALLOW CLONE source_table VERSION AS OF version;

CREATE TABLE target_table SHALLOW CLONE source_table TIMESTAMP AS OF timestamp_expression; -- timestamp can be like “2019-01-01” or like date_sub(current_date(), 1)

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "source_table")

deltaTable.clone(target="target_table", isShallow=True, replace=False) # clone the source at latest version

deltaTable.cloneAtVersion(version=1, target="target_table", isShallow=True, replace=False) # clone the source at a specific version

# clone the source at a specific timestamp such as timestamp="2019-01-01"
deltaTable.cloneAtTimestamp(timestamp="2019-01-01", target="target_table", isShallow=True, replace=False)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "source_table")

deltaTable.clone(target="target_table", isShallow=true, replace=false) // clone the source at latest version

deltaTable.cloneAtVersion(version=1, target="target_table", isShallow=true, replace=false) // clone the source at a specific version

deltaTable.cloneAtTimestamp(timestamp="2019-01-01", target="target_table", isShallow=true, replace=false) // clone the source at a specific timestamp

構文の詳細については、「CREATE TABLE CLONE」を参照してください。

複製のメトリック

CLONE 操作が完了すると、次のメトリックが単一行データフレームとして報告されます。

  • source_table_size: 複製されるソース テーブルのサイズ (バイト単位)。
  • source_num_of_files: ソース テーブル内のファイルの数。
  • num_removed_files: テーブルが置き換えられる場合、現在のテーブルから削除されるファイルの数。
  • num_copied_files: ソースからコピーされたファイルの数 (シャロー複製の場合は 0)。
  • removed_files_size: 現在のテーブルから削除されるファイルのサイズ (バイト単位)。
  • copied_files_size: テーブルにコピーされるファイルのサイズ (バイト単位)。

複製のメトリックのサンプル

アクセス許可

Azure Databricks テーブル アクセス制御とクラウド プロバイダーのアクセス許可を構成する必要があります。

テーブルのアクセス制御

ディープ複製とシャロー複製には、次のアクセス許可が必要です。

  • ソース テーブルに対する SELECT アクセス許可。
  • CLONE を使用して新しいテーブルを作成する場合は、テーブル作成先のデータベースに対する CREATE アクセス許可。
  • CLONE を使用してテーブルを置き換える場合は、テーブルに対する MODIFY アクセス許可が必要です。

クラウド プロバイダーのアクセス許可

ディープ複製を作成した場合は、ディープ複製を読み取るすべてのユーザーに、複製のディレクトリに対する読み取りアクセス権が必要です。 複製に変更を加えるには、ユーザーは複製のディレクトリに対する書き込みアクセス権を持っている必要があります。

シャロー複製を作成した場合、シャロー複製を読み取るすべてのユーザーに、元のテーブル内のファイルを読み取るためのアクセス許可が必要です。これは、シャロー複製ではデータ ファイルが複製のディレクトリだけでなく、ソース テーブルにも残っているためです。 複製に変更を加えるには、ユーザーに複製のディレクトリに対する書き込みアクセス権が必要です。

データのアーカイブに複製を使用する

ディープ クローンを使用して、アーカイブを目的として特定の時点におけるテーブルの状態を保存できます。 ディザスター リカバリーを目的としてソース テーブルの更新された状態を維持するために、ディープ クローンを増分的に同期できます。

-- Every month run
CREATE OR REPLACE TABLE archive_table CLONE my_prod_table

ML モデルの再現に複製を使用する

機械学習を行う場合は、ML モデルをトレーニングした特定のバージョンのテーブルをアーカイブすることができます。 将来のモデルは、このアーカイブしたデータ セットを使用してテストできます。

-- Trained model on version 15 of Delta table
CREATE TABLE model_dataset CLONE entire_dataset VERSION AS OF 15

実稼働テーブルでの短期実験に複製を使用する

テーブルを破損させずに、実稼働テーブルでワークフローをテストするには、シャロー複製を簡単に作成できます。 これにより、複製されたテーブルに対して、すべての運用データを含むが運用ワークロードには影響しない、任意のワークフローを実行できます。

-- Perform shallow clone
CREATE OR REPLACE TABLE my_test SHALLOW CLONE my_prod_table;

UPDATE my_test WHERE user_id is null SET invalid=true;
-- Run a bunch of validations. Once happy:

-- This should leverage the update information in the clone to prune to only
-- changed files in the clone if possible
MERGE INTO my_prod_table
USING my_test
ON my_test.user_id <=> my_prod_table.user_id
WHEN MATCHED AND my_test.user_id is null THEN UPDATE *;

DROP TABLE my_test;

複製を使用してテーブルのプロパティをオーバーライドする

テーブル プロパティのオーバーライドは、次の場合に特に便利です。

  • さまざまな部署でデータを共有するときに、テーブルに所有者またはユーザー情報の注釈を付ける。
  • 差分テーブルとテーブル履歴またはタイム トラベルのアーカイブが必要です。 アーカイブ テーブルに対して個別にデータとログの保持期間を指定できます。 次に例を示します。

SQL

CREATE OR REPLACE TABLE archive_table CLONE prod.my_table
TBLPROPERTIES (
delta.logRetentionDuration = '3650 days',
delta.deletedFileRetentionDuration = '3650 days'
)

Python

dt = DeltaTable.forName(spark, "prod.my_table")
tblProps = {
"delta.logRetentionDuration": "3650 days",
"delta.deletedFileRetentionDuration": "3650 days"
}
dt.clone(target="archive_table", isShallow=False, replace=True, tblProps)

Scala

val dt = DeltaTable.forName(spark, "prod.my_table")
val tblProps = Map(
"delta.logRetentionDuration" -> "3650 days",
"delta.deletedFileRetentionDuration" -> "3650 days"
)
dt.clone(target="archive_table", isShallow = false, replace = true, properties = tblProps)