CREATE STREAMING TABLE

適用対象: check marked yes Databricks SQL 「はい」のチェック マーク Databricks Runtime 13.3 LTS 以上

重要

この機能はパブリック プレビュー段階にあります。

ストリーミングまたは増分データ処理を追加でサポートする Delta テーブルである "ストリーミング テーブル" を作成します。

ストリーミング テーブルは、Delta Live Tables と Databricks SQL (と Unity Catalog) でのみサポートされます。 サポートされている Databricks Runtime コンピューティングでこのコマンドを実行すると、構文のみが解析されます。 「SQL を使用して Delta Live Tables パイプラインを実装する」をご覧ください。

構文

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] |
    WITH { ROW FILTER clause } } [...]

パラメーター

  • REFRESH

    指定した場合、クエリで定義されているソースから利用できる最新のデータでテーブルが更新されます。 クエリが開始される前に到着した新しいデータのみが処理されます。 コマンドの実行中にソースに追加される新しいデータは次の更新まで無視されます。

  • IF NOT EXISTS

    指定した場合、同じ名前のテーブルが既に存在すると、ステートメントは無視されます。

    IF NOT EXISTSREFRESH と共に使用できません。つまり、CREATE OR REFRESH TABLE IF NOT EXISTS は許可されません。

  • table_name

    作成されるテーブルの名前。 この名前には、テンポラル仕様を含めることはできません。 名前が修飾されていない場合、テーブルは現在のスキーマに作成されます。

  • table_specification

    この省略可能な句で、列、その型、プロパティ、説明、および列制約の一覧を定義します。

    テーブル スキーマで列を定義しない場合、AS query を指定する必要があります。

    • column_identifier

      列の一意の名前。

      • column_type

        列のデータ型を指定します。

      • NOT NULL

        指定した場合、列は NULL 値を受け取りません。

      • COMMENT column_comment

        列について説明する文字列リテラル。

      • column_constraint

        重要

        この機能はパブリック プレビュー段階にあります。

        ストリーミング テーブル内の列に主キーまたは外部キー制約を追加します。 制約は、hive_metastore カタログ内のテーブルではサポートされていません。

      • MASK 句

        重要

        この機能はパブリック プレビュー段階にあります。

        列マスク関数を追加して、機密データを匿名化します。 今後その列からのすべてのクエリでは、列の元の値の代わりに、列に対してその関数の評価結果を受け取ります。 これは、値を編集するかどうかを決定するために呼び出したユーザーの ID やグループ メンバーシップを関数で検査できる、きめ細かいアクセス制御に役立ちます。

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        テーブルにデータ品質の期待値を追加します。 このデータ品質の期待値は、一定期間追跡し、ストリーミング テーブルのイベント ログを介してアクセスできます。 テーブルの作成時とテーブル更新時の両方で、FAIL UPDATE 期待値により処理が失敗します。 DROP ROW 期待値が満たされない場合、行全体が削除されます。

        expectation_expr は、以下のものを除く、リテラル、テーブル内の列識別子、および決定論的な組み込みの SQL 関数または演算子で構成される場合があります。

        また expr には、サブクエリを含めることはできません。

      • table_constraint

        重要

        この機能はパブリック プレビュー段階にあります。

        情報主キーまたは情報外部キーの制約をストリーミング テーブルに追加します。 主な制約は、hive_metastore カタログ内のテーブルに対してはサポートされません。

  • table_clauses

    必要に応じて、パーティション分割、コメント、ユーザー定義プロパティ、新しいテーブルの更新スケジュールを指定します。 各サブ句は、1 回だけ指定できます。

    • PARTITIONED BY

      テーブルをパーティション分割するための、テーブルの列の省略可能な一覧。

    • COMMENT table_comment

      テーブルについて説明する STRING リテラル。

    • TBLPROPERTIES

      必要に応じて、1 つ以上のユーザー定義プロパティを設定します。

    • SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]

      指定した場合、ストリーミング テーブルまたは具体化されたビューをスケジュール設定し、指定された quartz cron スケジュールでデータが更新されるようにします。 time_zone_values のみが受け入れられます。 AT TIME ZONE LOCAL はサポートされません。 AT TIME ZONE が存在しない場合は、セッション タイム ゾーンが使用されます。 AT TIME ZONE が存在せず、セッション タイム ゾーンも設定されていない場合は、エラーがスローされます。 SCHEDULE は意味的に SCHEDULE REFRESH と同等です。

      Delta Live Tables パイプライン定義で SCHEDULE 構文を使用することはできません。

      SCHEDULE 句は CREATE OR REFRESH コマンドで許可されません。 スケジュールは CREATE コマンドの一部として指定できます。 作成後、ストリーミング テーブルのスケジュールを変更するには ALTER STREAMING TABLE を使用します。

    • WITH ROW FILTER 句

      重要

      この機能はパブリック プレビュー段階にあります。

      行フィルター関数をテーブルに追加します。 今後そのテーブルからのすべてのクエリでは、関数が Boolean TRUE に評価される行のサブセットを受け取ります。 これは、特定の行をフィルタリングするかどうかを決定するために呼び出したユーザーの ID やグループ メンバーシップを関数で検査できる、きめ細かいアクセス制御に役立ちます。

  • AS クエリ

    この句により、query からデータがテーブルに入力されます。 このクエリはストリーミング クエリにする必要があります。 そのためには増分的に処理するリレーションに STREAM キーワードを追加します。 querytable_specification を一緒に指定するとき、table_specification に指定されているテーブル スキーマに、query から返される列をすべて含める必要があります。含まれていない場合、エラーが出ます。 table_specification で指定されているが、query から返されない列はクエリ時に null 値を返します。

    この句は Databricks SQL で作成されたストリーミング テーブルに必要ですが、Delta Live Tables には必要ありません。 Delta Live Tables でこの句が指定されていない場合、DLT パイプラインの APPLY CHANGES コマンドでこのテーブルを参照する必要があります。 「Delta Live Tables の SQL を使用した変更データ キャプチャ」を参照してください。

ストリーミング テーブルと他のテーブルの違い

ストリーミング テーブルはステートフル テーブルであり、増加するデータセットを処理するときに各行を 1 回だけ処理するように設計されています。 ほとんどのデータセットは時間が経過するにつれて増大し続けるため、ストリーミング テーブルは、大半のインジェスト ワークロードに適しています。 ストリーミング テーブルは、データの鮮度と待ち時間の短さが要求されるパイプラインに最適です。 また、非常に大規模な変換を行う用途にも適しています。これは、新しいデータが入ってくるのに応じて増分方式で結果を計算し続けて最新の状態に保つことができ、更新のたびにソース データ全体を再計算する必要がないためです。 ストリーミング テーブルは追加専用のデータ ソースを想定して設計されています。

ストリーミング テーブルは、REFRESH などの追加コマンドを受け取ります。このコマンドは、クエリで提供されるソースで利用できる最新のデータを処理します。 指定されたクエリに対する変更は、以前に処理されたデータではなく、REFRESH を呼び出すことによって新しいデータにのみ反映されます。 既存のデータにも変更を適用するには、FULL REFRESH を実行するために REFRESH TABLE <table_name> FULL を実行する必要があります。 完全更新では、最新の定義を使用して、ソースで使用可能なすべてのデータが再処理されます。 完全更新では既存のデータが切り詰められるため、データの履歴全体を保持しないソースや、Kafka など、保持期間が短いソースの場合、完全更新の呼び出しは推奨されません。 ソースでデータが使用できなくなった場合、古いデータを回復できないことがあります。

行フィルターと列マスク

重要

この機能はパブリック プレビュー段階にあります。

行フィルターを使用すると、テーブル スキャンで行がフェッチされるたびにフィルターとして適用される関数を指定できます。 これらのフィルターにより、後続のクエリでフィルター述語が true と評価される行のみが返されるようになります。

列マスクを使用すると、テーブル スキャンで行がフェッチされるたびに列の値をマスクできます。 その列に関連する今後のすべてのクエリでは、列の元の値を置き換えて、列に対してその関数を評価した結果が返されます。

行フィルターと列マスクの使用方法の詳細については、「行フィルターと列マスクを使って機密性の高いテーブル データをフィルター処理する」を参照してください。

行フィルターと列マスクの管理

ストリーミング テーブルの行フィルターと列マスクは、CREATE OR REFRESH ステートメントを通じて追加、更新、または削除する必要があります。

Behavior

  • 定義者として更新: CREATE OR REFRESH または REFRESH ステートメントがストリーミング テーブルを更新すると、行フィルター関数は定義者の権限で (テーブル所有者として) 実行されます。 つまり、テーブルの更新では、ストリーミング テーブルを作成したユーザーのセキュリティ コンテキストが使用されます。
  • クエリ: ほとんどのフィルターは定義者の権限で実行されますが、ユーザー コンテキストをチェックする関数 (CURRENT_USERIS_MEMBER など) は例外です。 これらの関数は呼び出し元として実行されます。 このアプローチでは、現在のユーザーのコンテキストに基づいて、ユーザー固有のデータ セキュリティとアクセス制御が適用されます。

可観測性

DESCRIBE EXTENDEDINFORMATION_SCHEMA、またはカタログ エクスプローラーを使用して、特定のストリーミング テーブルに適用される既存の行フィルターと列マスクを調べます。 この機能により、ユーザーはストリーミング テーブルのデータ アクセスと保護対策を監査および確認できます。

制限事項

  • テーブル所有者だけがストリーミング テーブルを更新して最新のデータを取得できます。

  • ALTER TABLE コマンドはストリーミング テーブルでは許可されません。 テーブルの定義とプロパティは、CREATE OR REFRESH または ALTER STREAMING TABLE ステートメントを使用して変更する必要があります。

  • タイム トラベル クエリはサポートされていません。

  • INSERT INTOMERGE などの DML コマンドを利用してテーブル スキーマを導き出すことはできません。

  • ストリーミング テーブルでは、次のコマンドはサポートされていません。

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Delta Sharing はサポートされていません。

  • テーブルの名前変更や所有者の変更はサポートされていません。

  • PRIMARY KEYFOREIGN KEY などのテーブル制約はサポートされていません。

  • 生成された列、ID 列、既定の列はサポートされていません。

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')