Delta Live Tables: SQL-Sprachreferenz

Dieser Artikel enthält ausführliche Informationen zur SQL-Programmierschnittstelle für Delta Live Tables.

Sie können benutzerdefinierte Python-Funktionen (User-Defined Functions, UDFs) in Ihren SQL-Abfragen verwenden, aber Sie müssen diese UDFs in Python-Dateien definieren, bevor Sie sie in SQL-Quelldateien aufrufen. Weitere Informationen finden Sie unter Benutzerdefinierte Skalarfunktionen: Python.

Begrenzungen

Die PIVOT-Klausel wird nicht unterstützt. Der pivot-Vorgang in Spark erfordert Eager Loading von Eingabedaten, um das Ausgabeschema zu berechnen. Diese Funktion wird in Delta Live Tables nicht unterstützt.

Erstellen einer materialisierten Sicht oder Streamingtabelle für Delta Live Tables

Hinweis

  • Die CREATE OR REFRESH LIVE TABLE-Syntax zum Erstellen einer materialisierten Ansicht ist veraltet. Verwenden Sie stattdessen CREATE OR REFRESH MATERIALIZED VIEW.
  • Damit die CLUSTER BY-Klausel zum Aktivieren des Liquid Clustering verwendet werden kann, muss Ihre Pipeline für die Verwendung des Vorschaukanals konfiguriert sein.

Sie verwenden dieselbe grundlegende SQL-Syntax, wenn Sie eine Streamingtabelle oder eine materialisierte Sicht deklarieren.

Deklarieren einer materialisierten Ansicht mit Delta Live Tables mit SQL

Im Folgenden wird die Syntax zum Deklarieren einer materialisierten Ansicht in Delta Live Tables mit SQL beschrieben:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Deklarieren einer Streamingtabelle mit Delta Live Tables mit SQL

Sie können Streamingtabellen nur mithilfe von Abfragen für einen Streaminglesevorgang deklarieren. Databricks empfiehlt die Verwendung des Autoloaders für die Streamingerfassung von Dateien aus dem Cloudobjektspeicher. Weitere Informationen finden Sie unter SQL-Syntax des Autoloaders.

Sie müssen die STREAM()-Funktion um einen Datasetnamen einschließen, wenn Sie andere Tabellen oder Sichten in Ihrer Pipeline als Streamingquelle angeben.

Im Folgenden wird die Syntax zum Deklarieren einer Streamingtabelle in Delta Live Tables mit SQL beschrieben:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Erstellen einer Delta Live Tables-Sicht

Nachfolgend wird die Syntax zum Deklarieren von Sichten mit SQL beschrieben:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

SQL-Syntax des Autoloaders

Im Folgenden wird die Syntax für die Arbeit mit dem Autoloader in SQL beschrieben:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Sie können unterstützte Formatoptionen mit dem Autoloader verwenden. Mithilfe der map()-Funktion können Sie Optionen an die read_files()-Methode übergeben. Die Optionen sind Schlüssel-Wert-Paare, bei denen die Schlüssel und Werte Zeichenfolgen sind. Ausführliche Informationen zur Unterstützung von Formaten und Optionen finden Sie unter Dateiformatoptionen.

Beispiel: Definieren von Tabellen

Sie können ein Dataset erstellen, indem Sie aus einer externen Datenquelle oder aus Datasets lesen, die in einer Pipeline definiert sind. Um aus einem internen Dataset zu lesen, stellen Sie dem Datasetnamen das Schlüsselwort LIVE voran. Im folgenden Beispiel werden zwei verschiedene Datasets definiert: eine Tabelle namens taxi_raw, die eine JSON-Datei als Eingabequelle verwendet, und eine Tabelle namens filtered_data, die die Tabelle taxi_raw als Eingabe verwendet:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Beispiel: Lesen aus einer Streamingquelle

Um Daten aus einer Streamingquelle zu lesen (z. B. Autoloader oder internes Dataset), definieren Sie eine STREAMING-Tabelle:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Weitere Informationen zu Streamingdaten finden Sie unter Transformieren von Daten mit Delta Live Tables.

Steuern der Materialisierung von Tabellen

Tabellen bieten auch zusätzliche Steuerung ihrer Materialisierung:

  • Geben Sie an, wie Tabellen mit PARTITIONED BYpartitioniert werden. Sie können die Partitionierung verwenden, um Abfragen zu beschleunigen.
  • Sie können Tabelleneigenschaften mithilfe von TBLPROPERTIES festlegen. Weitere Informationen finden Sie unter Delta Live Tables-Tabelleneigenschaften.
  • Legen Sie mithilfe der LOCATION-Einstellung einen Speicherort fest. Standardmäßig werden Tabellendaten am Speicherort der Pipeline gespeichert, wenn LOCATION nicht festgelegt ist.
  • Sie können Generierte Spalten in Ihrer Schemadefinition verwenden. Weitere Informationen finden Sie unter Beispiel: Angeben des Schemas und der Partitionsspalten.

Hinweis

Für Tabellen mit einer Größe von weniger als 1 TB empfiehlt Databricks, Delta Live Tables die Strukturierung der Daten steuern zu lassen. Wenn Sie nicht davon ausgehen, dass die Größe Ihrer Tabelle über ein Terabyte hinausgeht, empfiehlt Databricks, dass Sie keine Partitionsspalten angeben.

Beispiel: Angeben des Schemas und der Partitionsspalten

Sie können optional ein Schema angeben, wenn Sie eine Tabelle definieren. Im folgenden Beispiel wird das Schema für die Zieltabelle angegeben, einschließlich der Verwendung von den von Delta Lake generierten Spalten und der Definition von Partitionsspalten für die Tabelle:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Standardmäßig leitet Delta Live Tables das Schema aus der table-Definition ab, wenn Sie kein Schema angeben.

Beispiel: Definieren von Tabellenconstraints

Hinweis

Die Unterstützung von Delta Live Tables für Tabellenconstraints befindet sich in der Public Preview. Um Tabelleneinschränkungens zu definieren, muss Ihre Pipeline eine Unity Catalog-fähige Pipeline sein und für die Verwendung des preview Kanals konfiguriert sein.

Beim Angeben eines Schemas können Sie Primär- und Fremdschlüssel definieren. Die Einschränkungen dienen der Information und werden nicht erzwungen. Weitere Informationen finden Sie unter der CONSTRAINT-Klausel in der SQL-Sprachreferenz.

Im folgenden Beispiel wird eine Tabelle mit einer Primär- und Fremdschlüsseleinschränkung definiert:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Parametrisieren von Werten, die beim Deklarieren von Tabellen oder Sichten mit SQL verwendet werden

Verwenden Sie SET, um einen Konfigurationswert in einer Abfrage, die eine Tabelle oder Sicht deklariert, anzugeben, einschließlich der Spark-Konfigurationen. Jede Tabelle oder Sicht, die Sie in einem Notebook nach der SET-Anweisung definieren, hat Zugriff auf den definierten Wert. Alle Spark-Konfigurationen, die mit der SET-Anweisung angegeben werden, werden verwendet, wenn die Spark-Abfrage für eine Tabelle oder Sicht ausgeführt wird, die auf die SET-Anweisung folgt. Verwenden Sie zum Lesen eines Konfigurationswerts in einer Abfrage die Zeichenfolgeninterpolationssyntax ${}. Im folgenden Beispiel wird ein Spark-Konfigurationswert mit dem Namen startDate festgelegt, und dieser Wert wird in einer Abfrage verwendet:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Um mehrere Konfigurationswerte anzugeben, verwenden Sie eine separate SET-Anweisung für jeden Wert.

Beispiel: Definieren eines Zeilenfilters und einer Spaltenmaske

Wichtig

Zeilenfilter und Spaltenformate befinden sich im Public Preview.

Verwenden Sie die ROW FILTER-Klausel und die MASK-Klausel, um eine materialisierte Ansicht oder Streamingtabelle mit einem Zeilenfilter und spaltenformat zu erstellen. Im folgenden Beispiel wird veranschaulicht, wie sie eine materialisierte Ansicht und eine Streamingtabelle mit einem Zeilenfilter und einem Spaltenformat definieren:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze

Weitere Informationen zu Zeilenfiltern und Spaltenmasken finden Sie unter Veröffentlichen von Tabellen mit Zeilenfiltern und Spaltenmasken.

SQL-Eigenschaften

Hinweis

Damit die CLUSTER BY-Klausel zum Aktivieren des Liquid Clustering verwendet werden kann, muss Ihre Pipeline für die Verwendung des Vorschaukanals konfiguriert sein.

CREATE TABLE oder VIEW
TEMPORARY

Erstellen Sie eine Tabelle, veröffentlichen Sie jedoch keine Metadaten für die Tabelle. Die Klausel TEMPORARY weist Delta Live Tables an, eine Tabelle zu erstellen, die für die Pipeline verfügbar ist, auf die aber nicht außerhalb der Pipeline zugegriffen werden sollte. Um die Verarbeitungszeit zu reduzieren, wird eine temporäre Tabelle für die Lebensdauer der Pipeline beibehalten, die sie erstellt, und nicht nur für ein einzelnes Update.
STREAMING

Erstellen einer Tabelle, die ein Eingabedataset als Stream liest. Das Eingabedataset muss eine Streamingdatenquelle sein, z. B. Autoloader oder eine STREAMING-Tabelle.
CLUSTER BY

Aktivieren Sie das Liquid Clustering in der Tabelle und definieren Sie die Spalten, die sie als Clusteringschlüssel verwenden.

Weitere Informationen finden Sie unter Verwenden von Liquid Clustering für Delta-Tabellen.
PARTITIONED BY

Eine optionale Liste einer oder mehrerer Spalten, die zum Partitionieren der Tabelle verwendet werden sollen
LOCATION

Ein optionaler Speicherort für Tabellendaten. Wenn diese Einstellung nicht festgelegt ist, verwendet das System standardmäßig den Speicherort der Pipeline.
COMMENT

Dies ist eine optionale Beschreibung für die Tabelle
column_constraint

Eine optionale informationale Primärschlüssel- oder Fremdschlüsseleinschränkung für die Spalte.
MASK clause (Public Preview)

Fügt eine Spaltenmaskierungsfunktion hinzu, um vertrauliche Zeichenfolgenwerte zu anonymisieren. Zukünftige Abfragen für diese Spalte geben das Ergebnis der ausgewerteten Funktion anstelle des ursprünglichen Werts der Spalte zurück. Dies ist nützlich für eine differenzierte Zugriffssteuerung, da die Funktion die Identität und die Gruppenmitgliedschaften des Benutzers überprüfen kann, um zu entscheiden, ob der Wert maskiert werden soll.

Siehe Spaltenmaskenklausel.
table_constraint

Eine optionale informationale Primärschlüssel- oder Fremdschlüsseleinschränkung in der Tabelle.
TBLPROPERTIES

Eine optionale Liste der Tabelleneigenschaften für die Tabelle
WITH ROW FILTER clause (Public Preview)

Fügt der Tabelle eine Zeilenfilterfunktion hinzu. Zukünftige Abfragen für diese Tabelle erhalten eine Teilmenge der Zeilen, für die die Funktion als TRUE ausgewertet wird. Dies ist für eine differenzierte Zugriffssteuerung nützlich, da die Funktion dadurch die Identität oder Gruppenmitgliedschaften des aufrufenden Benutzers überprüfen kann, um zu entscheiden, ob bestimmte Zeilen gefiltert werden sollen.

Siehe ROW FILTER-Klausel.
select_statement

Eine Delta Live Tables-Abfrage, die das Dataset für die Tabelle definiert
CONSTRAINT-Klausel
EXPECT expectation_name

Definiert die Einschränkung expectation_name für die Datenqualität. Wenn die Einschränkung ON VIOLATION nicht definiert ist, fügen Sie dem Zieldataset Zeilen hinzu, die gegen die Einschränkung verstoßen.
ON VIOLATION

Optionale Aktion für fehlerhafte Zeilen:

- FAIL UPDATE: Die Pipelineausführung wird sofort beendet.
- DROP ROW: Der Datensatz wird abgelegt, und die Verarbeitung wird fortgesetzt.

Change Data Capture mit SQL in Delta Live Tables

Verwenden Sie die APPLY CHANGES INTO-Anweisung, um die CDC-Funktionalität von Delta Live Tables wie folgt zu verwenden:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Sie definieren Datenqualitätseinschränkungen für ein APPLY CHANGES-Ziel unter Verwendung derselben CONSTRAINT-Klausel wie für Abfragen, die nicht vom Typ APPLY CHANGES sind. Siehe Verwalten der Datenqualität mit Delta Live Tables.

Hinweis

Das Standardverhalten für INSERT- und UPDATE-Ereignisse ist das Ausführen eines Upserts von CDC-Ereignissen aus der Quelle: das Aktualisieren aller Zeilen in der Zieltabelle, die mit den angegebenen Schlüsseln übereinstimmen, oder das Einfügen einer neuen Zeile, wenn kein übereinstimmender Datensatz in der Zieltabelle vorhanden ist. Die Behandlung von DELETE-Ereignissen kann mit der APPLY AS DELETE WHEN-Bedingung angegeben werden.

Wichtig

Sie müssen eine Zielstreamingtabelle deklarieren, auf die Änderungen angewendet werden sollen. Optional können Sie das Schema für Ihre Zieltabelle angeben. Wenn Sie das Schema der APPLY CHANGES-Zieltabelle angeben, müssen Sie auch die Spalten __START_AT und __END_AT mit demselben Datentyp wie das Feld sequence_by angeben.

Weitere Informationen finden Sie unter Die APPLY CHANGES-APIs: Vereinfachen der Änderungsdatenerfassung mit Delta Live Tables.

Klauseln
KEYS

Die Spalte oder Kombination von Spalten, die eine Zeile in den Quelldaten eindeutig identifiziert. Damit wird ermittelt, welche CDC-Ereignisse für bestimmte Datensätze in der Zieltabelle gelten.

Verwenden Sie zum Definieren einer Kombination von Spalten eine durch Trennzeichen getrennte Liste von Spalten.

Diese Klausel ist erforderlich.
IGNORE NULL UPDATES

Ermöglicht das Erfassen von Updates, die eine Teilmenge der Zielspalten enthalten. Wenn ein CDC-Ereignis mit einer vorhandenen Zeile übereinstimmt und IGNORE NULL UPDATES angegeben ist, behalten Spalten mit einer null ihre vorhandenen Werte im Ziel bei. Dies gilt auch für geschachtelte Spalten mit dem Wert null.

Diese Klausel ist optional.

Die Standardeinstellung ist das Überschreiben vorhandener Spalten mit null-Werten.
APPLY AS DELETE WHEN

Gibt an, wann ein CDC-Ereignis als DELETE und nicht als Upsert behandelt werden soll. Um nicht sortierte Daten zu verarbeiten, wird die gelöschte Zeile vorübergehend als Tombstone in der zugrunde liegenden Delta-Tabelle beibehalten, und im Metastore wird eine Sicht erstellt, die diese Tombstones herausfiltert. Das Aufbewahrungsintervall kann konfiguriert werden mit:
pipelines.cdc.tombstoneGCThresholdInSeconds Tabelleneigenschaft.

Diese Klausel ist optional.
APPLY AS TRUNCATE WHEN

Gibt an, wann ein CDC-Ereignis als TRUNCATE der gesamten Tabelle behandelt werden sollte. Da diese Klausel die vollständige Abschneidung der Zieltabelle auslöst, sollte sie nur in bestimmten Anwendungsfälle verwendet werden, die die Nutzung dieser Funktion erfordern.

Die APPLY AS TRUNCATE WHEN-Klausel wird nur für den SCD-Typ 1 unterstützt. Der SCD-Typ 2 unterstützt nicht den Abkürzungsvorgang.

Diese Klausel ist optional.
SEQUENCE BY

Der Spaltenname, der die logische Reihenfolge der CDC-Ereignisse in den Quelldaten angibt. Delta Live Tables verwendet diese Sequenzierung, um Änderungsereignisse zu behandeln, die in nicht ordnungsgemäßer Reihenfolge eingehen.

Die angegebene Spalte muss ein sortierbarer Datentyp sein.

Diese Klausel ist erforderlich.
COLUMNS

Gibt eine Teilmenge der Spalten an, die in die Zieltabelle eingeschlossen werden sollen. Sie haben folgende Möglichkeiten:

– Geben Sie die vollständige Liste der einzuschließenden Spalten an: COLUMNS (userId, name, city).
– Geben Sie eine Liste der auszuschließenden Spalten an: COLUMNS * EXCEPT (operation, sequenceNum).

Diese Klausel ist optional.

Standardmäßig werden alle Spalten in die Zieltabelle eingeschlossen, wenn die COLUMNS-Klausel nicht angegeben ist.
STORED AS

Gibt an, ob Datensätze als SCD-Typ 1 oder SCD-Typ 2 gespeichert werden sollen.

Diese Klausel ist optional.

Der Standardwert ist SCD-Typ 1.
TRACK HISTORY ON

Gibt eine Teilmenge der Ausgabespalten an, um Verlaufsdatensätze zu generieren, wenn Änderungen an diesen angegebenen Spalten vorgenommen werden. Sie haben folgende Möglichkeiten:

– Geben Sie die vollständige Liste der nachzuverfolgenden Spalten an: COLUMNS (userId, name, city).
– Geben Sie eine Liste von Spalten an, die von der Nachverfolgung ausgeschlossen werden sollen: COLUMNS * EXCEPT (operation, sequenceNum)

Diese Klausel ist optional. Der Standardwert ist das Nachverfolgen des Verlaufs für alle Ausgabespalten, wenn Änderungen vorhanden sind. Dies entspricht TRACK HISTORY ON *.