CREATE STREAMING TABLE
適用対象: Databricks SQL Databricks Runtime 13.3 LTS 以上
重要
この機能はパブリック プレビュー段階にあります。
ストリーミングまたは増分データ処理を追加でサポートする Delta テーブルである "ストリーミング テーブル" を作成します。
ストリーミング テーブルは、Delta Live Tables と Databricks SQL (と Unity Catalog) でのみサポートされます。 サポートされている Databricks Runtime コンピューティングでこのコマンドを実行すると、構文のみが解析されます。 「SQL を使用して Delta Live Tables パイプラインを実装する」をご覧ください。
構文
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] |
WITH { ROW FILTER clause } } [...]
パラメーター
REFRESH
指定した場合、クエリで定義されているソースから利用できる最新のデータでテーブルが更新されます。 クエリが開始される前に到着した新しいデータのみが処理されます。 コマンドの実行中にソースに追加される新しいデータは次の更新まで無視されます。
IF NOT EXISTS
指定した場合、同じ名前のテーブルが既に存在すると、ステートメントは無視されます。
IF NOT EXISTS
はREFRESH
と共に使用できません。つまり、CREATE OR REFRESH TABLE IF NOT EXISTS
は許可されません。-
作成されるテーブルの名前。 この名前には、テンポラル仕様を含めることはできません。 名前が修飾されていない場合、テーブルは現在のスキーマに作成されます。
table_specification
この省略可能な句で、列、その型、プロパティ、説明、および列制約の一覧を定義します。
テーブル スキーマで列を定義しない場合、
AS query
を指定する必要があります。-
列の一意の名前。
-
列のデータ型を指定します。
NOT NULL
指定した場合、列は
NULL
値を受け取りません。COMMENT column_comment
列について説明する文字列リテラル。
-
重要
この機能はパブリック プレビュー段階にあります。
ストリーミング テーブル内の列に主キーまたは外部キー制約を追加します。 制約は、
hive_metastore
カタログ内のテーブルではサポートされていません。 -
重要
この機能はパブリック プレビュー段階にあります。
列マスク関数を追加して、機密データを匿名化します。 今後その列からのすべてのクエリでは、列の元の値の代わりに、列に対してその関数の評価結果を受け取ります。 これは、値を編集するかどうかを決定するために呼び出したユーザーの ID やグループ メンバーシップを関数で検査できる、きめ細かいアクセス制御に役立ちます。
CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
テーブルにデータ品質の期待値を追加します。 このデータ品質の期待値は、一定期間追跡し、ストリーミング テーブルのイベント ログを介してアクセスできます。 テーブルの作成時とテーブル更新時の両方で、
FAIL UPDATE
期待値により処理が失敗します。DROP ROW
期待値が満たされない場合、行全体が削除されます。expectation_expr
は、以下のものを除く、リテラル、テーブル内の列識別子、および決定論的な組み込みの SQL 関数または演算子で構成される場合があります。また
expr
には、サブクエリを含めることはできません。-
重要
この機能はパブリック プレビュー段階にあります。
情報主キーまたは情報外部キーの制約をストリーミング テーブルに追加します。 主な制約は、
hive_metastore
カタログ内のテーブルに対してはサポートされません。
-
-
table_clauses
必要に応じて、パーティション分割、コメント、ユーザー定義プロパティ、新しいテーブルの更新スケジュールを指定します。 各サブ句は、1 回だけ指定できます。
-
テーブルをパーティション分割するための、テーブルの列の省略可能な一覧。
COMMENT table_comment
テーブルについて説明する
STRING
リテラル。-
必要に応じて、1 つ以上のユーザー定義プロパティを設定します。
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]
指定した場合、ストリーミング テーブルまたは具体化されたビューをスケジュール設定し、指定された quartz cron スケジュールでデータが更新されるようにします。 time_zone_values のみが受け入れられます。
AT TIME ZONE LOCAL
はサポートされません。AT TIME ZONE
が存在しない場合は、セッション タイム ゾーンが使用されます。AT TIME ZONE
が存在せず、セッション タイム ゾーンも設定されていない場合は、エラーがスローされます。SCHEDULE
は意味的にSCHEDULE REFRESH
と同等です。Delta Live Tables パイプライン定義で
SCHEDULE
構文を使用することはできません。SCHEDULE
句はCREATE OR REFRESH
コマンドで許可されません。 スケジュールはCREATE
コマンドの一部として指定できます。 作成後、ストリーミング テーブルのスケジュールを変更するには ALTER STREAMING TABLE を使用します。WITH ROW FILTER 句
重要
この機能はパブリック プレビュー段階にあります。
行フィルター関数をテーブルに追加します。 今後そのテーブルからのすべてのクエリでは、関数が Boolean TRUE に評価される行のサブセットを受け取ります。 これは、特定の行をフィルタリングするかどうかを決定するために呼び出したユーザーの ID やグループ メンバーシップを関数で検査できる、きめ細かいアクセス制御に役立ちます。
-
AS クエリ
この句により、
query
からデータがテーブルに入力されます。 このクエリはストリーミング クエリにする必要があります。 そのためには増分的に処理するリレーションにSTREAM
キーワードを追加します。query
とtable_specification
を一緒に指定するとき、table_specification
に指定されているテーブル スキーマに、query
から返される列をすべて含める必要があります。含まれていない場合、エラーが出ます。table_specification
で指定されているが、query
から返されない列はクエリ時にnull
値を返します。この句は Databricks SQL で作成されたストリーミング テーブルに必要ですが、Delta Live Tables には必要ありません。 Delta Live Tables でこの句が指定されていない場合、DLT パイプラインの
APPLY CHANGES
コマンドでこのテーブルを参照する必要があります。 「Delta Live Tables の SQL を使用した変更データ キャプチャ」を参照してください。
ストリーミング テーブルと他のテーブルの違い
ストリーミング テーブルはステートフル テーブルであり、増加するデータセットを処理するときに各行を 1 回だけ処理するように設計されています。 ほとんどのデータセットは時間が経過するにつれて増大し続けるため、ストリーミング テーブルは、大半のインジェスト ワークロードに適しています。 ストリーミング テーブルは、データの鮮度と待ち時間の短さが要求されるパイプラインに最適です。 また、非常に大規模な変換を行う用途にも適しています。これは、新しいデータが入ってくるのに応じて増分方式で結果を計算し続けて最新の状態に保つことができ、更新のたびにソース データ全体を再計算する必要がないためです。 ストリーミング テーブルは追加専用のデータ ソースを想定して設計されています。
ストリーミング テーブルは、REFRESH
などの追加コマンドを受け取ります。このコマンドは、クエリで提供されるソースで利用できる最新のデータを処理します。 指定されたクエリに対する変更は、以前に処理されたデータではなく、REFRESH
を呼び出すことによって新しいデータにのみ反映されます。 既存のデータにも変更を適用するには、FULL REFRESH
を実行するために REFRESH TABLE <table_name> FULL
を実行する必要があります。 完全更新では、最新の定義を使用して、ソースで使用可能なすべてのデータが再処理されます。 完全更新では既存のデータが切り詰められるため、データの履歴全体を保持しないソースや、Kafka など、保持期間が短いソースの場合、完全更新の呼び出しは推奨されません。 ソースでデータが使用できなくなった場合、古いデータを回復できないことがあります。
行フィルターと列マスク
重要
この機能はパブリック プレビュー段階にあります。
行フィルターを使用すると、テーブル スキャンで行がフェッチされるたびにフィルターとして適用される関数を指定できます。 これらのフィルターにより、後続のクエリでフィルター述語が true と評価される行のみが返されるようになります。
列マスクを使用すると、テーブル スキャンで行がフェッチされるたびに列の値をマスクできます。 その列に関連する今後のすべてのクエリでは、列の元の値を置き換えて、列に対してその関数を評価した結果が返されます。
行フィルターと列マスクの使用方法の詳細については、「行フィルターと列マスクを使って機密性の高いテーブル データをフィルター処理する」を参照してください。
行フィルターと列マスクの管理
ストリーミング テーブルの行フィルターと列マスクは、CREATE OR REFRESH
ステートメントを通じて追加、更新、または削除する必要があります。
Behavior
- 定義者として更新:
CREATE OR REFRESH
またはREFRESH
ステートメントがストリーミング テーブルを更新すると、行フィルター関数は定義者の権限で (テーブル所有者として) 実行されます。 つまり、テーブルの更新では、ストリーミング テーブルを作成したユーザーのセキュリティ コンテキストが使用されます。 - クエリ: ほとんどのフィルターは定義者の権限で実行されますが、ユーザー コンテキストをチェックする関数 (
CURRENT_USER
やIS_MEMBER
など) は例外です。 これらの関数は呼び出し元として実行されます。 このアプローチでは、現在のユーザーのコンテキストに基づいて、ユーザー固有のデータ セキュリティとアクセス制御が適用されます。
可観測性
DESCRIBE EXTENDED
、INFORMATION_SCHEMA
、またはカタログ エクスプローラーを使用して、特定のストリーミング テーブルに適用される既存の行フィルターと列マスクを調べます。 この機能により、ユーザーはストリーミング テーブルのデータ アクセスと保護対策を監査および確認できます。
制限事項
テーブル所有者だけがストリーミング テーブルを更新して最新のデータを取得できます。
ALTER TABLE
コマンドはストリーミング テーブルでは許可されません。 テーブルの定義とプロパティは、CREATE OR REFRESH
またはALTER STREAMING TABLE
ステートメントを使用して変更する必要があります。タイム トラベル クエリはサポートされていません。
INSERT INTO
やMERGE
などの DML コマンドを利用してテーブル スキーマを導き出すことはできません。ストリーミング テーブルでは、次のコマンドはサポートされていません。
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
Delta Sharing はサポートされていません。
テーブルの名前変更や所有者の変更はサポートされていません。
PRIMARY KEY
やFOREIGN KEY
などのテーブル制約はサポートされていません。生成された列、ID 列、既定の列はサポートされていません。
例
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')
関連記事
フィードバック
https://aka.ms/ContentUserFeedback」を参照してください。
以下は間もなく提供いたします。2024 年を通じて、コンテンツのフィードバック メカニズムとして GitHub の issue を段階的に廃止し、新しいフィードバック システムに置き換えます。 詳細については、「フィードバックの送信と表示