Delta Sharing オープン共有 (受信者用) を使用して共有されたデータを読み取る
この記事では、Delta Sharing "オープン共有" プロトコルを使用した共有データを読み取る方法について説明します。 これには、Databricks、Apache Spark、pandas、Power BI、Tableau を使用して共有データを読み取る手順が含まれています。
オープン共有では、データ プロバイダーによってチームのメンバーと共有された資格情報ファイルを使用して、共有データへの安全な読み取りアクセスを取得します。 資格情報が有効であり、プロバイダーがデータを共有し続ける限り、アクセス権は保持されます。 プロバイダーは資格情報の有効期限とローテーションを管理します。 データの更新は、ほぼリアルタイムで利用できます。 共有データの読み取りとコピーはできますが、ソースデータを変更することはできません。
Note
データが Databricks 間 Delta Sharing を使用して共有されている場合、データにアクセスするために資格情報ファイルは必要なく、この記事は適用されません。 手順については、「Databricks 間 Delta Sharing (受信者用) を使用して共有されたデータの読み取り」を参照してください。
以下のセクションでは、Azure Databricks、Apache Spark、pandas、Power BI で資格情報ファイルを使用して、共有データにアクセスして読み取る方法について説明します。 Delta Sharing コネクタの完全なリストと使用方法に関する情報については、Delta Sharing オープンソースのドキュメントを参照してください。 共有データへのアクセスで問題が発生した場合は、データプロバイダーにお問い合わせください。
注意
別に記載がない限り、パートナー統合はサード パーティから提供されており、その製品およびサービスの利用には、適切なプロバイダーのアカウントを持っている必要があります。 Databricks ではこのコンテンツを最新の状態に保つように努めていますが、パートナー統合ページ上の統合やコンテンツの正確性については Microsoft が表明するものではありません。 統合に関しては適切なプロバイダーに連絡してください。
開始する前に
チームのメンバーは、データ プロバイダーによって共有されている資格情報ファイルをダウンロードする必要があります。 「オープン共有モデルでアクセスを取得する」を参照してください。
そのファイルまたはファイルの場所を共有するには、セキュリティで保護されたチャネルを使用する必要があります。
Azure Databricks: オープン共有コネクタを使用して共有データを読み取る
このセクションでは、オープン共有コネクタを使用して、Azure Databricks ワークスペースのノートブックを使用して共有データにアクセスする方法について説明します。 ユーザーまたはそのチームの別のメンバーが資格情報ファイルを DBFS に格納したら、それを使用してデータ プロバイダーの Azure Databricks アカウントに対する認証を行い、データ プロバイダーが共有しているデータを読み取ります。
Note
データ プロバイダーが Databricks 間共有を使用していて、資格情報ファイルを共有していない場合は、Unity Catalog を使用してデータにアクセスする必要があります。 手順については、「Databricks 間 Delta Sharing (受信者用) を使用して共有されたデータの読み取り」を参照してください。
この例では、独立して実行できる複数のセルを含むノートブックを作成します。 代わりに、同じセルにノートブック コマンドを追加して、順番に実行することもできます。
手順 1: 資格情報ファイルを DBFS に格納する (Python の手順)
この手順では、Azure Databricks の Python ノートブックを使用して資格情報ファイルを格納し、チームのユーザーが共有データにアクセスできるようにします。
自分またはチームの誰かが既に資格情報ファイルを DBFS に保存している場合は、次の手順に進みます。
テキスト エディターで、資格情報ファイルを開きます。
Azure Databricks ワークスペースで、[新規] > [ノートブック] をクリックします。
- 名前を入力します。
- ノートブックの既定の言語を Python に設定します。
- ノートブックにアタッチするクラスターを選択します。
- Create をクリックしてください。
ノートブックエディターでノートブックが開きます。
Python または Pandas を使用して共有データにアクセスするには、 Delta Sharing Python コネクタをインストールします。 ノートブックエディターで、次のコマンドを貼り付けます。
%sh pip install delta-sharing
セルを実行します。
delta-sharing
Python ライブラリがまだインストールされていない場合は、クラスターにインストールします。新しいセルに次のコマンドを貼り付けます。このコマンドは、資格情報ファイルの内容を DBFS 内のフォルダーにアップロードします。 変数を次のように置き換えます。
<dbfs-path>
: パスを資格情報ファイルを保存したいフォルダーに<credential-file-contents>
: 資格情報ファイルの内容 これは、ファイルへのパスではなく、ファイルのコピーされた内容です。資格情報ファイルには、
shareCredentialsVersion
、endpoint
、bearerToken
の 3 つのフィールドを定義する JSON が含まれています。%scala dbutils.fs.put("<dbfs-path>/config.share",""" <credential-file-contents> """)
セルを実行します。
資格情報ファイルがアップロードされたら、このセルを削除できます。 すべてのワークスペース ユーザーは DBFS から資格情報ファイルを読み取ることができ、資格情報ファイルは、すべてのクラスター上の DBFS とワークスペース内の SQL ウェアハウスで使用できます。 セルを削除するには、右端のセル操作メニューで、xをクリックします。
手順 2: ノートブックを使用して共有テーブルの一覧表示と読み取りを行う
この手順では、"共有" 内のテーブル、または共有テーブルとパーティションのセットを一覧表示し、テーブルに対してクエリを実行します。
Python を使用して、共有内のテーブルを一覧表示します。
新しいセルに次のコマンドを貼り付けます。
<dbfs-path>
を、「手順 1: 資格情報ファイルを DBFS に格納する (Python の手順)」で作成したパスに置き換えます。コードを実行すると、Python はクラスターの DBFS から資格情報ファイルを読み取ります。 パス
/dbfs/
で DBFS に格納されているデータにアクセスします。import delta_sharing client = delta_sharing.SharingClient(f"/dbfs/<dbfs-path>/config.share") client.list_all_tables()
セルを実行します。
結果は、テーブルの配列と各テーブルのメタデータとなります。 次の出力は、2つのテーブルを表示します。
Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
出力が空か、期待されるテーブルが含まれていない場合は、データプロバイダーにお問い合わせください。
共有テーブルに対してクエリを実行します。
Scala の使用:
新しいセルに次のコマンドを貼り付けます。 コードを実行すると、資格情報ファイルが JVM を介して DBFS から読み取られます。
変数を次のように置き換えます。
<profile-path>
: 資格情報ファイルの DBFS パス。 たとえば、「/<dbfs-path>/config.share
」のように入力します。<share-name>
: テーブルのshare=
の値。<schema-name>
: テーブルのschema=
の値。<table-name>
: テーブルのname=
の値。
%scala spark.read.format("deltaSharing") .load("<profile-path>#<share-name>.<schema-name>.<table-name>").limit(10);
セルを実行します。 共有テーブルを読み込むたびに、ソースからの新しいデータが表示されます。
SQL の使用:
SQL を使用して共有データのクエリを実行するには、共有テーブルからワークスペースにローカル テーブルを作成した後、そのローカル テーブルにクエリを実行します。 共有データは、ローカルテーブルに保存またはキャッシュされません。 ローカルテーブルに対してクエリを実行するたびに、共有データの現在の状態が表示されます。
新しいセルに次のコマンドを貼り付けます。
変数を次のように置き換えます。
<local-table-name>
: ローカルテーブルの名前。<profile-path>
: 資格情報ファイルの場所。<share-name>
: テーブルのshare=
の値。<schema-name>
: テーブルのschema=
の値。<table-name>
: テーブルのname=
の値。
%sql DROP TABLE IF EXISTS table_name; CREATE TABLE <local-table-name> USING deltaSharing LOCATION "<profile-path>#<share-name>.<schema-name>.<table-name>"; SELECT * FROM <local-table-name> LIMIT 10;
コマンドを実行すると、共有データに直接クエリが実行されます。 テストとして、テーブルに対してクエリが実行され、最初の 10 件の結果が返されます。
出力が空か、期待されるデータが含まれていない場合は、データプロバイダーにお問い合わせください。
Apache Spark: 共有データを読み取る
Apache Spark 3.x 以上を使用して共有データにアクセスするには、次の手順に従います。
以下の手順では、データ プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。
Delta Sharing Python コネクタと Spark コネクタをインストールする
共有されたテーブルの一覧など、共有データに関連するメタデータにアクセスするには、次の手順を行います。 この例では Python を使用します。
Delta Sharing Python コネクタをインストールします。
pip install delta-sharing
Apache Spark コネクタをインストールします。
Spark を使用して共有テーブルを一覧表示する
共有内のテーブルを一覧表示します。 次の例では、 <profile-path>
を資格情報ファイルの場所に置き換えます。
import delta_sharing
client = delta_sharing.SharingClient(f"<profile-path>/config.share")
client.list_all_tables()
結果は、テーブルの配列と各テーブルのメタデータとなります。 次の出力は、2つのテーブルを表示します。
Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
出力が空か、期待されるテーブルが含まれていない場合は、データプロバイダーにお問い合わせください。
Spark を使用して共有データにアクセスする
次の変数を置き換えて、次のコマンドを実行します。
<profile-path>
: 資格情報ファイルの場所。<share-name>
: テーブルのshare=
の値。<schema-name>
: テーブルのschema=
の値。<table-name>
: テーブルのname=
の値。<version-as-of>
: 省略可能。 データを読み込むテーブルのバージョン。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。delta-sharing-spark
0.5.0 以上が必要です。<timestamp-as-of>
: 省略可能。 特定のタイムスタンプまたはそれより前のバージョンでデータを読み込みます。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。delta-sharing-spark
0.6.0 以上が必要です。
Python
delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)
spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))
delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)
spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))
Scala
次の変数を置き換えて、次のコマンドを実行します。
<profile-path>
: 資格情報ファイルの場所。<share-name>
: テーブルのshare=
の値。<schema-name>
: テーブルのschema=
の値。<table-name>
: テーブルのname=
の値。<version-as-of>
: 省略可能。 データを読み込むテーブルのバージョン。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。delta-sharing-spark
0.5.0 以上が必要です。<timestamp-as-of>
: 省略可能。 特定のタイムスタンプまたはそれより前のバージョンでデータを読み込みます。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。delta-sharing-spark
0.6.0 以上が必要です。
spark.read.format("deltaSharing")
.option("versionAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)
spark.read.format("deltaSharing")
.option("timestampAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)
Spark を使用して共有変更データ フィードにアクセスする
テーブル履歴が自分と共有されていて、ソース テーブルで変更データ フィード (CDF) が有効になっている場合は、次を実行して変更データ フィードにアクセスできます (これらの変数を置き換えます)。 delta-sharing-spark
0.5.0 以上が必要です。
開始パラメーターは 1 つだけ指定する必要があります。
<profile-path>
: 資格情報ファイルの場所。<share-name>
: テーブルのshare=
の値。<schema-name>
: テーブルのschema=
の値。<table-name>
: テーブルのname=
の値。<starting-version>
: 省略可能。 クエリの開始バージョン (これを含む)。 Long として指定します。<ending-version>
: 省略可能。 クエリの終了バージョン (これを含む)。 終了バージョンが指定されていない場合、API では最新のテーブル バージョンが使用されます。<starting-timestamp>
: 省略可能。 クエリの開始タイムスタンプ。このタイムスタンプ以降に作成されたバージョンに変換されます。yyyy-mm-dd hh:mm:ss[.fffffffff]
形式の文字列として指定します。<ending-timestamp>
: 省略可能。 クエリの終了タイムスタンプ。このタイムスタンプ以前に作成されたバージョンに変換されます。yyyy-mm-dd hh:mm:ss[.fffffffff]
形式の文字列として指定します
Python
delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_version=<starting-version>,
ending_version=<ending-version>)
delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_timestamp=<starting-timestamp>,
ending_timestamp=<ending-timestamp>)
spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
Scala
spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("statingVersion", <starting-version>)
.option("endingVersion", <ending-version>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("startingTimestamp", <starting-timestamp>)
.option("endingTimestamp", <ending-timestamp>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
出力が空か、期待されるデータが含まれていない場合は、データプロバイダーにお問い合わせください。
Spark 構造化ストリームを使用して共有テーブルにアクセスする
テーブル履歴が共有されている場合は、共有データの読み取りをストリーミングできます。 delta-sharing-spark
0.6.0 以上が必要です。
サポートされているオプション:
ignoreDeletes
: データを削除するトランザクションを無視します。ignoreChanges
:UPDATE
、MERGE INTO
、DELETE
(パーティション内)、OVERWRITE
などのデータ変更操作のためにファイルがソース テーブルで書き換えられた場合は、更新を再処理します。 変更されていない行は、引き続き出力できます。 そのため、ダウンストリームのコンシューマーが重複を処理できる必要があります。 削除はダウンストリームには反映されません。ignoreChanges
はignoreDeletes
を含みます。 そのため、ignoreChanges
を使用するろ、ソース テーブルに対する削除または更新によってストリームが中断されることはありません。startingVersion
: 対象となる最初の共有テーブルのバージョン。 このバージョン (を含む) 以降のすべてのテーブル変更は、ストリーミング ソースによって読み取りされます。startingTimestamp
: 対象となる最初のタイムスタンプ。 このタイムスタンプ (を含む) 以降にコミットされたすべてのテーブル変更は、ストリーミング ソースによって読み取りされます。 例:"2023-01-01 00:00:00.0"
.maxFilesPerTrigger
: すべてのマイクロバッチで考慮される新しいファイルの数。maxBytesPerTrigger
: 各マイクロバッチで処理されるデータの量。 このオプションにより "ソフト最大値" が設定されます。これは、最小の入力単位がこの制限を超える場合にストリーミング クエリを進めるために、バッチでほぼこの量のデータが処理され、制限を超える処理が行われる可能性があることを意味します。readChangeFeed
: 共有テーブルの変更データ フィードをストリームで読み取ります。
サポートされていないオプション:
Trigger.availableNow
サンプルの構造化ストリーミング クエリ
Scala
spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
Python
spark.readStream.format("deltaSharing")\
.option("startingVersion", 0)\
.option("ignoreDeletes", true)\
.option("maxBytesPerTrigger", 10000)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
「Azure Databricks でのストリーミング」も参照してください。
削除ベクターまたは列マッピングが有効になっているテーブルの読み取り
重要
この機能はパブリック プレビュー段階にあります。
削除ベクトルは、プロバイダーが共有 Delta テーブルで有効にできるストレージ最適化機能です。 「削除ベクトルとは」を参照してください。
Azure Databricks では、Delta テーブルの列マッピングもサポートされています。 「Delta Lake の列マッピングを使用して列の名前変更と削除を行う」をご覧ください。
削除ベクトルまたは列マッピングが有効になっているテーブルをプロバイダーが共有している場合は、delta-sharing-spark
3.1 以降を実行しているコンピューティングを使用してそのテーブルを読み取ることができます。 Databricks クラスターを使用している場合は、Databricks Runtime 14.1 以降を実行しているクラスターを使用してバッチ読み取りを実行できます。 CDF とストリーミングの各クエリには、Databricks Runtime 14.2 以降が必要です。
バッチ クエリは、共有テーブルのテーブル機能に基づいて responseFormat
を自動的に解決できるため、そのまま実行できます。
変更データ フィード (CDF) を読み取ったり、削除ベクトルまたは列マッピングが有効になっている共有テーブルに対してストリーミング クエリを実行したりするには、追加のオプション responseFormat=delta
を設定する必要があります。
次の例は、バッチ、CDF、ストリーミングの各クエリを示しています。
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"
// Batch query
spark.read.format("deltaSharing").load(tablePath)
// CDF query
spark.read.format("deltaSharing")
.option("readChangeFeed", "true")
.option("responseFormat", "delta")
.option("startingVersion", 1)
.load(tablePath)
// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)
Pandas: 共有データを読み取る
Pandas 0.25.3 以上で共有データにアクセスするには、次の手順に従います。
以下の手順では、データ プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。
Delta Sharing Python コネクタをインストールする
共有されたテーブルの一覧など、共有データに関連するメタデータにアクセスするには、Delta Sharing Python コネクタをインストールします。
pip install delta-sharing
pandas を使用して共有テーブルを一覧表示する
共有内のテーブルを一覧表示するには、<profile-path>/config.share
を資格情報ファイルの場所に置き換えて、次を実行します。
import delta_sharing
client = delta_sharing.SharingClient(f"<profile-path>/config.share")
client.list_all_tables()
出力が空か、期待されるテーブルが含まれていない場合は、データプロバイダーにお問い合わせください。
pandas を使用して共有データにアクセスする
Python を使用して pandas で共有データにアクセスするには、変数を次のように置き換えて、次を実行します。
<profile-path>
: 資格情報ファイルの場所。<share-name>
: テーブルのshare=
の値。<schema-name>
: テーブルのschema=
の値。<table-name>
: テーブルのname=
の値。
import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")
pandas を使用して共有変更データ フィードにアクセスする
Python を使用して pandas で共有テーブルの変更データ フィードにアクセスするには、変数を次のように置き換えて、次を実行します。 データ プロバイダーがテーブルの変更データ フィードを共有しているかどうかによって、変更データ フィードを使用できない場合があります。
<starting-version>
: 省略可能。 クエリの開始バージョン (これを含む)。<ending-version>
: 省略可能。 クエリの終了バージョン (これを含む)。<starting-timestamp>
: 省略可能。 クエリの開始タイムスタンプ。 このタイムスタンプ以降に作成されたバージョンに変換されます。<ending-timestamp>
: 省略可能。 クエリの終了タイムスタンプ。 このタイムスタンプ以前に作成されたバージョンに変換されます。
import delta_sharing
delta_sharing.load_table_changes_as_pandas(
f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_version=<starting-version>,
ending_version=<starting-version>)
delta_sharing.load_table_changes_as_pandas(
f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_timestamp=<starting-timestamp>,
ending_timestamp=<ending-timestamp>)
出力が空か、期待されるデータが含まれていない場合は、データプロバイダーにお問い合わせください。
Power BI: 共有データを読み取る
Power BI Delta Sharing コネクタを使用すると、Delta Sharing オープン プロトコルを使用して、共有されているデータセットを検出、分析、視覚化できます。
必要条件
- Power BI Desktop 2.99.621.0 以上。
- データ プロバイダーによって共有された資格情報ファイルにアクセスします。 「オープン共有モデルでアクセスを取得する」を参照してください。
Databricks に接続する
Delta Sharing コネクタを使用して Azure Databricks に接続するには、次の手順を行います。
- テキスト エディターで資格情報ファイルを開き、エンドポイント URL とトークンを取得します。
- Power BI Desktop を開きます。
- [データを取得する] メニューで、[Delta Sharing] を見つけます。
- コネクタを選び、[接続] をクリックします。
- [Delta Sharing サーバーの URL] フィールドに、資格情報ファイルから取得したエンドポイント URL を入力します。
- 必要に応じて、[詳細設定オプション] タブで、ダウンロードできる行の最大数に対して [行数の制限] を設定できます。 既定では、これは 100 万行に設定されています。
- [OK] をクリックします。
- [認証] では、資格情報ファイルから取得したトークンをベアラー トークンにコピーします。
- [接続] をクリックします。
Power BI Delta Sharing コネクタの制限事項
Power BI Delta Sharing コネクタには、次の制限があります。
- コネクタによって読み込まれるデータは、マシンのメモリに収まる必要があります。 この要件を管理するために、コネクタは、インポートされる行数を、Power BI Desktop の [詳細設定オプション] タブで設定した [行の制限] に制限します。
Tableau: 共有データを読み取る
Tableau Delta Sharing コネクタを使用すると、Delta Sharing オープン プロトコルを使用して共有されているデータセットを検出、分析、視覚化できます。
要件
- Tableau Desktop と Tableau Server 2024.1 以降
- データ プロバイダーによって共有された資格情報ファイルにアクセスします。 「オープン共有モデルでアクセスを取得する」を参照してください。
Azure Databricks に接続する
Delta Sharing コネクタを使用して Azure Databricks に接続するには、次の手順を行います。
- Tableau Exchange に移動し、指示に従って Delta Sharing コネクタをダウンロードし、適切なデスクトップ フォルダーに配置します。
- Tableau Desktop を開きます。
- [コネクタ] ページで、"Databricks による Delta Sharing" を検索します。
- [共有ファイルのアップロード] を選択し、プロバイダーによって共有された資格情報ファイルを選びます。
- [データの取得] をクリックします。
- データ エクスプローラーで、テーブルを選びます。
- 必要に応じて、SQL フィルターまたは行の制限を追加します。
- [テーブル データの取得] をクリックします。
Tableau Delta Sharing コネクタの制限事項
Tableau Delta Sharing コネクタには、次の制限があります。
- コネクタによって読み込まれるデータは、マシンのメモリに収まる必要があります。 この要件を管理するために、コネクタはインポートされる行の数を Tableau で設定した行制限に制限します。
- すべての列は型
String
として返されます。 - SQL フィルターは、Delta Sharing サーバーが predicateHint をサポートしている場合にのみ機能します。
新しい資格情報を要求する
資格情報のアクティブ化 URL やダウンロードされた資格情報が失われるか、破損するか、侵害された場合、または資格情報の有効期限が切れてプロバイダーから新しい資格情報が送信されていない場合は、プロバイダーに連絡して新しい資格情報を要求してください。