Laden von Daten mithilfe von Streamingtabellen in Databricks SQL
Databricks empfiehlt die Verwendung von Streamingtabellen zum Erfassen von Daten mit Databricks SQL. Eine Streamingtabelle ist eine bei Unity Catalog registrierte Tabelle mit zusätzlicher Unterstützung für Streaming oder inkrementelle Datenverarbeitung. Für jede Streamingtabelle wird automatisch eine Delta Live Tabellen Pipeline erstellt. Sie können Streamingtabellen für das inkrementelle Laden von Daten von Kafka und vom Cloudobjektspeicher verwenden.
In diesem Artikel wird die Verwendung von Streamingtabellen zum Laden von Daten aus dem Cloudobjektspeicher veranschaulicht, der als Unity Catalog-Volume (empfohlen) oder externer Speicherort konfiguriert ist.
Hinweis
Informationen zum Verwenden von Delta Lake-Tabellen als Streamingquellen und -senken finden Sie unter Delta-Tabelle: Streaming für Lese- und Schreibvorgänge.
Vorbemerkungen
Bevor Sie beginnen, müssen die folgenden Voraussetzungen erfüllt sein.
Anforderungen an den Arbeitsbereich:
- Ein Azure Databricks-Konto mit aktiviertem serverlosen Konto. Weitere Informationen finden Sie unter Aktivieren von serverlosen SQL-Warehouses.
- Einen Arbeitsbereich, für den Unity Catalog aktiviert ist. Weitere Informationen finden Sie unter Einrichten und Verwalten von Unity Catalog.
Computeanforderungen:
Sie müssen eine der folgenden Voraussetzungen verwenden:
Ein SQL-Warehouse, das den
Current
-Kanal verwendet.Compute im Modus für gemeinsamen Zugriff in Databricks Runtime 13.3 LTS oder höher.
Compute with single user access mode on Databricks Runtime 15.4 LTS or above.
Auf Databricks Runtime 15.3 und unten können Sie keine einzelnen Benutzer berechnen, um Streamingtabellen abzufragen, die im Besitz anderer Benutzer sind. Sie können single user compute on Databricks Runtime 15.3 and below only if you own the streaming table verwenden. Der Ersteller der Tabelle ist der Besitzer.
Databricks Runtime 15.4 LTS und höher unterstützen Abfragen zu Delta Live Tables-generierten Tabellen bei der Berechnung einzelner Benutzer, unabhängig vom Tabellenbesitz. Um die in Databricks Runtime 15.4 LTS und höher bereitgestellten Datenfilter nutzen zu können, müssen Sie auch überprüfen, ob Ihr Arbeitsbereich für serverloses Computing aktiviert ist, da die Datenfilterfunktion, die von Delta Live Tables generierte Tabellen unterstützt, auf serverlosem Computing ausgeführt wird. Sie können serverlose Computeressourcen in Rechnung stellen, wenn Sie datenfilterungsvorgänge mit einem einzelnen Benutzer ausführen. Weitere Informationen finden Sie unter Feinkornierte Zugriffssteuerung für die Berechnung einzelner Benutzer.
Berechtigungsanforderungen:
- Die
READ FILES
-Berechtigung für einen externen Unity Catalog-Speicherort. Weitere Informationen finden Sie unter Erstellen eines externen Speicherorts zum Verbinden des Cloudspeichers mit Azure Databricks. - Die
USE CATALOG
-Berechtigung für den Katalog, in dem Sie die Streamingtabelle erstellen. - Die
USE SCHEMA
-Berechtigung für das Schema, in dem Sie die Streamingtabelle erstellen. - Die
CREATE TABLE
-Berechtigung für das Schema, in dem Sie die Streamingtabelle erstellen.
Sonstige Anforderungen:
Der Pfad zu Ihren Quelldaten.
Volumepfadbeispiel:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
Beispiel für externen Speicherortpfad:
abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis
Hinweis
In diesem Artikel wird davon ausgegangen, dass sich die Daten, die Sie laden möchten, an einem Cloudspeicherort befinden, der einem Unity Catalog-Volume oder einem externen Speicherort entspricht, auf den Sie Zugriff haben.
Ermitteln und Anzeigen einer Vorschau von Quelldaten
Klicken Sie in der Seitenleiste Ihres Arbeitsbereichs auf Abfragen, und klicken Sie dann auf Abfrage erstellen.
Wählen Sie im Abfrage-Editor in der Dropdownliste ein SQL-Warehouse aus, das den
Current
-Kanal verwendet.Fügen Sie Folgendes in den Editor ein. Ersetzen Sie dabei die Werte in spitzen Klammern (
<>
) durch die Informationen, die Ihre Quelldaten identifizieren, und klicken Sie dann auf Ausführen.Hinweis
Beim Ausführen der Tabellenwertfunktion
read_files
treten möglicherweise Schemarückschlussfehler auf, wenn die Standardwerte für die Funktion Ihre Daten nicht analysieren können. Beispielsweise müssen Sie möglicherweise den mehrzeiligen Modus für mehrzeilige CSV- oder JSON-Dateien konfigurieren. Eine Liste der Parseroptionen finden Sie unter Tabellenwertfunktion read_files./* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
Laden von Daten in eine Streamingtabelle
Um eine Streamingtabelle aus Daten im Cloudobjektspeicher zu erstellen, fügen Sie Folgendes in den Abfrage-Editor ein, und klicken Sie dann auf Ausführen:
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')
Aktualisieren einer Streamingtabelle mithilfe einer DLT-Pipeline
In diesem Abschnitt werden Muster zum Aktualisieren einer Streamingtabelle mit den neuesten Daten beschrieben, die aus den in der Abfrage definierten Quellen zur Verfügung stehen.
Wenn Sie CREATE
oder REFRESH
eine Streamingtabelle eine Streamingtabelle verwenden, werden die Aktualisierungsprozesse mithilfe einer serverlosen Delta Live Tables-Pipeline verarbeitet. Jede von Ihnen definierte Streamingtabelle verfügt über eine zugeordnete Delta Live Tables-Pipeline.
Nachdem Sie den REFRESH
-Befehl ausgeführt haben, wird der DLT-Pipelinelink zurückgegeben. Sie können den DLT-Pipelinelink verwenden, um den Status der Aktualisierung zu überprüfen.
Hinweis
Nur der Tabellenbesitzer kann eine Streamingtabelle aktualisieren, um die neuesten Daten abzurufen. Benutzer*innen, die die Tabelle erstellen, sind die Besitzer*innen, und Besitzer*innen können nicht geändert werden. Möglicherweise müssen Sie Ihre Streamingtabelle aktualisieren, bevor Sie Zeitreiseabfragen verwenden.
Weitere Informationen finden Sie unter Was sind Delta Live-Tabellen?.
Nur Erfassen neuer Daten
Standardmäßig liest die read_files
-Funktion alle vorhandenen Daten im Quellverzeichnis während der Tabellenerstellung und verarbeitet dann bei jeder Aktualisierung neu eintreffende Datensätze.
Legen Sie die Option includeExistingFiles
auf false
fest, um zu vermeiden, dass Daten erfasst werden, die zum Zeitpunkt der Tabellenerstellung bereits im Quellverzeichnis vorhanden sind. Dies bedeutet, dass nur Daten verarbeitet werden, die nach der Tabellenerstellung im Verzeichnis eingehen. Beispiel:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
includeExistingFiles => false)
Vollständiges Aktualisieren einer Streamingtabelle
Vollständige Aktualisierungen verarbeiten alle in der Quelle verfügbaren Daten mit der neuesten Definition erneut. Es wird nicht empfohlen, vollständige Aktualisierungen für Quellen aufzurufen, die nicht den gesamten Datenverlauf beibehalten oder kurze Aufbewahrungszeiträume aufweisen, wie z. B. Kafka, da durch eine vollständige Aktualisierung die vorhandenen Daten abgeschnitten werden. Möglicherweise können Sie alte Daten nicht wiederherstellen, wenn die Daten in der Quelle nicht mehr verfügbar sind.
Beispiel:
REFRESH STREAMING TABLE my_bronze_table FULL
Planen einer Streamingtabelle für die automatische Aktualisierung
Um eine Streamingtabelle für die automatische Aktualisierung basierend auf einem definierten Zeitplan zu konfigurieren, fügen Sie Folgendes in den Abfrage-Editor ein, und klicken Sie dann auf Ausführen:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
Aktualisierungszeitplanabfragen finden Sie beispielsweise unter ALTER STREAMING TABLE.
Nachverfolgen des Status einer Aktualisierung
Sie können den Status einer Streamingtabellenaktualisierung anzeigen, indem Sie die Pipeline anzeigen, die die Streamingtabelle auf der Benutzeroberfläche von Delta Live Tables verwaltet, oder indem Sie die vom DESCRIBE EXTENDED
-Befehl für die Streamingtabelle zurückgegebenen Aktualisierungsinformationen anzeigen.
DESCRIBE EXTENDED <table-name>
Streamingerfassung von Kafka
Ein Beispiel für die Streamingerfassung von Kafka finden Sie unter read_kafka.
Gewähren des Zugriffs auf eine Streamingtabelle für Benutzer*innen
Um Benutzer*innen die SELECT
-Berechtigung für die Streamingtabelle zu gewähren, damit sie sie abfragen können, fügen Sie Folgendes in den Abfrage-Editor ein, und klicken Sie dann auf Ausführen:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
Weitere Informationen zum Gewähren von Berechtigungen für sicherungsfähige Unity Catalog-Objekte finden Sie unter Unity Catalog-Berechtigungen und sicherungsfähige Objekte.