CREATE STREAMING TABLE

Si applica a: segno di spunta sì Databricks SQL

Crea una tabella di streaming, una tabella Delta con supporto aggiuntivo per lo streaming o l'elaborazione incrementale dei dati.

Le tabelle di streaming sono supportate solo nelle tabelle Delta Live e in Databricks SQL con Unity Catalog. L'esecuzione di questo comando nel calcolo di Databricks Runtime supportato analizza solo la sintassi. Vedere Implementare una pipeline di Delta Live Tables con SQL.

Sintassi

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

Parametri

  • REFRESH

    Se specificato, aggiorna la tabella con i dati più recenti disponibili dalle origini definite nella query. Vengono elaborati solo i nuovi dati che arrivano prima dell'avvio della query. I nuovi dati aggiunti alle origini durante l'esecuzione del comando vengono ignorati fino al successivo aggiornamento. L'operazione di aggiornamento di CREATE o REFRESH è completamente dichiarativa. Se un comando refresh non specifica tutti i metadati dell'istruzione di creazione della tabella originale, i metadati non specificati vengono eliminati.

  • SE NON ESISTE

    Crea la tabella di streaming nel caso questa non esista. Se esiste già una tabella con questo nome, l'istruzione CREATE STREAMING TABLE viene ignorata.

    È possibile specificare al massimo uno di IF NOT EXISTS o OR REFRESH.

  • table_name

    Il nome della tabella da creare. Il nome non deve includere una specifica temporale. Se il nome non è qualificato, la tabella viene creata nello schema corrente.

  • table_specification

    Questa clausola facoltativa definisce l'elenco di colonne, i relativi tipi, proprietà, descrizioni e vincoli di colonna.

    Se non si definiscono colonne nello schema della tabella, è necessario specificare AS query.

    • column_identifier

      Nome univoco per la colonna.

      • column_type

        Specifica il tipo di dati della colonna.

      • NOT NULL

        Se la colonna specificata non accetta valori NULL.

      • COMMENT column_comment

        Valore letterale stringa per descrivere la colonna.

      • column_constraint

        Importante

        Questa funzionalità è disponibile in anteprima pubblica.

        Aggiunge una chiave primaria o un vincolo di chiave esterna alla colonna in una tabella di streaming. I vincoli non sono supportati per le tabelle nel catalogo hive_metastore.

      • Clausola MASK

        Importante

        Questa funzionalità è disponibile in anteprima pubblica.

        Aggiunge una funzione maschera di colonna per rendere anonimi i dati sensibili. Tutte le query successive da tale colonna ricevono il risultato della valutazione della funzione sulla colonna al posto del valore originale della colonna. Ciò può essere utile per scopi di controllo di accesso con granularità fine, in cui la funzione può esaminare l'identità o le appartenenze a gruppi dell'utente che richiama per decidere se revisionare il valore.

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        Aggiunge alla tabella aspettative sulla qualità dei dati. Queste aspettative sulla qualità dei dati possono essere rilevate nel tempo e accessibili tramite il registro eventi della tabella di streaming. Un’aspettativa FAIL UPDATE causa un errore di elaborazione sia quando si crea la tabella che quando si aggiorna la tabella. Se l'aspettativa DROP ROW non viene soddisfatta, l'intera riga viene eliminata.

        expectation_expr può essere composto da valori letterali, identificatori di colonna all'interno della tabella e funzioni SQL predefinite o operatori, ad eccezione di:

        Inoltre expr, non deve contenere alcuna sottoquery.

      • table_constraint

        Importante

        Questa funzionalità è disponibile in anteprima pubblica.

        Aggiunge una chiave primaria informativa o vincoli di chiave esterna informativi a una tabella di streaming. I vincoli di chiave non sono supportati per le tabelle nel catalogo hive_metastore.

  • table_clauses

    Facoltativamente, specificare partizionamento, commenti, proprietà definite dall'utente e una pianificazione di aggiornamento per la nuova tabella. Ogni clausola secondaria può essere specificata una sola volta.

    • PARTIZIONATO DA

      Elenco facoltativo di colonne della tabella per partizionare la tabella.

    • COMMENT table_comment

      Valore STRING letterale per descrivere la tabella.

    • TBLPROPERTIES

      Facoltativamente, imposta una o più proprietà definite dall'utente.

      Usare questa impostazione per specificare il canale di runtime delta live tables usato per eseguire questa istruzione. Impostare il valore della pipelines.channel proprietà su "PREVIEW" o "CURRENT". Il valore predefinito è "CURRENT". Per altre informazioni sui canali di tabelle live Delta, vedere Canali di runtime di Tabelle live Delta.

    • SCHEDULE [ REFRESH ] schedule_clause

      • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

        Importante

        Questa funzionalità è disponibile in anteprima pubblica.

        Per pianificare un aggiornamento che si verifica periodicamente, usare la EVERY sintassi. Se EVERY si specifica la sintassi, la tabella di streaming o la vista materializzata viene aggiornata periodicamente all'intervallo specificato in base al valore specificato, ad esempio HOUR, HOURSDAY, DAYS, , WEEKo WEEKS. Nella tabella seguente sono elencati i valori integer accettati per number.

        Time unit Valore intero
        HOUR or HOURS 1 <= H <= 72
        DAY or DAYS 1 <= D <= 31
        WEEK or WEEKS 1 <= W <= 8

        Nota

        Le forme singolari e plurali dell'unità temporale inclusa sono semanticamente equivalenti.

      • CRON cron_string [ AT TIME ZONE timezone_id ]

        Per pianificare un aggiornamento utilizzando un valore cron di quarzi. Vengono accettati time_zone_values validi. AT TIME ZONE LOCAL non è supportata.

        Se AT TIME ZONE è assente, viene usato il fuso orario della sessione. Se AT TIME ZONE è assente e il fuso orario della sessione non è impostato, viene generato un errore. SCHEDULE è semanticamente equivalente a SCHEDULE REFRESH.

      La pianificazione può essere specificata come parte del comando CREATE. Usare ALTER STREAMING TABLE o eseguire il comando CREATE OR REFRESH con clausola SCHEDULE per modificare la pianificazione di una tabella di streaming dopo la creazione.

    • Con clausola ROW FILTER

      Importante

      Questa funzionalità è disponibile in anteprima pubblica.

      Aggiunge una funzione di filtro di riga alla tabella. Tutte le query successive da tale tabella ricevono un subset delle righe in cui la funzione restituisce TRUE booleano. Ciò può essere utile per scopi di controllo di accesso con granularità fine in cui la funzione può controllare l'identità o le appartenenze ai gruppi dell'utente che richiama per decidere se filtrare determinate righe.

  • Query AS

    Questa clausola popola la tabella usando i dati di query. Questa query deve essere una query di streaming. A tale scopo, è possibile aggiungere la parola chiave STREAM a qualsiasi relazione da elaborare in modo incrementale. Quando si specifica un query e un table_specification insieme, lo schema della tabella specificato in table_specification deve contenere tutte le colonne restituite da query, in caso contrario viene visualizzato un errore. Qualsiasi colonna specificata in table_specification ma non restituita da query restituisce i valori null quando viene eseguita una query.

Differenze tra tabelle di streaming e altre tabelle

Le tabelle di streaming sono tabelle con stato, progettate per gestire ogni riga una sola volta durante l'elaborazione di un set di dati in crescita. Poiché la maggior parte dei set di dati aumenta continuamente nel tempo, le tabelle di streaming sono valide per la maggior parte dei carichi di lavoro di inserimento. Le tabelle di streaming sono ottimali per le pipeline che richiedono aggiornamento dei dati e bassa latenza. Le tabelle di streaming possono essere utili anche per le trasformazioni su larga scala, poiché i risultati possono essere calcolati in modo incrementale man mano che arrivano nuovi dati, mantenendo i risultati aggiornati senza dover ricompilare completamente tutti i dati di origine a ogni aggiornamento. Le tabelle di streaming sono progettate per le origini dati che sono solo di accodamento.

Le tabelle di streaming accettano comandi aggiuntivi, ad esempio REFRESH, che elabora i dati più recenti disponibili nelle origini fornite nella query. Le modifiche apportate alla query fornita vengono riflesse solo sui nuovi dati chiamando un oggetto REFRESH, non elaborato in precedenza. Per applicare le modifiche anche ai dati esistenti, è necessario eseguire REFRESH TABLE <table_name> FULL per eseguire FULL REFRESH. L'aggiornamento completo rielabora tutti i dati disponibili nell'origine usando la definizione più recente. Non è consigliabile richiamare aggiornamenti completi sulle origini che non mantengono l'intera cronologia dei dati o hanno brevi periodi di conservazione, ad esempio Kafka, perché l'aggiornamento completo tronca i dati esistenti. Potrebbe non essere possibile recuperare i dati obsoleti se i dati non sono più disponibili nell'origine.

Filtri di riga e maschere di colonna

Importante

Questa funzionalità è disponibile in anteprima pubblica.

I filtri di riga consentono di specificare una funzione che viene applicata come filtro ogni volta che un'analisi di tabella recupera righe. Questi filtri assicurano che le query successive restituiscano solo righe per le quali il predicato di filtro restituisce TRUE.

Le maschere di colonna consentono di mascherare i valori di una colonna ogni volta che un'analisi di tabella recupera le righe. Tutte le query future che coinvolgono tale colonna riceveranno il risultato della valutazione della funzione sulla colonna, sostituendo il valore originale della colonna.

Per altre informazioni su come usare filtri di riga e maschere di colonna, vedere Filtrare i dati delle tabelle sensibili usando filtri di riga e maschere di colonne.

Gestire filtri di riga e maschere di colonna

I filtri di riga e le maschere di colonna nelle tabelle di streaming devono essere aggiunti, aggiornati o eliminati tramite l'istruzione CREATE OR REFRESH.

Comportamento

  • Aggiornare come Definer: quando le istruzioni CREATE OR REFRESH o REFRESH aggiornano una tabella di streaming, le funzioni di filtro di riga vengono eseguite con i diritti del definitore (in quanto proprietario della tabella). Ciò significa che l'aggiornamento della tabella usa il contesto di sicurezza dell'utente che ha creato la tabella di streaming.
  • Query: mentre la maggior parte dei filtri viene eseguita con i diritti del definer, le funzioni che controllano il contesto utente (ad esempio CURRENT_USER e IS_MEMBER) costituiscono un’eccezione. Queste funzioni vengono eseguite come invoker. Questo approccio applica controlli di accesso e sicurezza dei dati specifici dell'utente in base al contesto dell'utente corrente.

Osservabilità

Usare DESCRIBE EXTENDED, INFORMATION_SCHEMA o Esplora cataloghi per esaminare i filtri di riga e le maschere di colonna esistenti applicabili a una determinata tabella di streaming. Questa funzionalità consente agli utenti di controllare ed esaminare le misure di accesso e protezione dei dati nelle tabelle di streaming.

Limiti

  • Solo i proprietari delle tabelle possono aggiornare le tabelle di streaming per ottenere i dati più recenti.

  • I comandi ALTER TABLE non sono consentiti nelle tabelle di streaming. La definizione e le proprietà della tabella devono essere modificate tramite l'istruzione CREATE OR REFRESH o ALTER STREAMING TABLE.

  • Le query di spostamento temporale non sono supportate.

  • L'evoluzione dello schema di tabella tramite comandi DML come INSERT INTO e MERGE non è supportata.

  • I seguenti comandi non sono supportati nelle tabelle di streaming:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • La condivisione Delta non è supportata.

  • La ridenominazione della tabella o la modifica del proprietario non sono supportate.

  • I vincoli di tabella, ad esempio PRIMARY KEY e FOREIGN KEY, non sono supportati.

  • Le colonne generate, le colonne Identity e le colonne predefinite non sono supportate.

Esempi

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')