Caricare dati usando tabelle di streaming in Databricks SQL
Per inserire dati usando Databricks SQL, Databricks consiglia di usare le tabelle di streaming. Una tabella di streaming è una tabella registrata nel catalogo Unity con supporto aggiuntivo per lo streaming o l'elaborazione incrementale dei dati. Per ogni tabella di streaming viene creata automaticamente una pipeline Delta Live Tables. È possibile usare le tabelle di streaming per il caricamento incrementale dei dati da Kafka e dall'archiviazione di oggetti cloud.
Questo articolo illustra l'uso delle tabelle di streaming per caricare i dati dall'archiviazione di oggetti cloud configurata come volume del catalogo Unity (scelta consigliata) o come posizione esterna.
Nota
Per informazioni su come usare le tabelle Delta Lake come origini di streaming e sink, vedere Letture e scritture di tabelle Delta in streaming.
Prima di iniziare
Prima di iniziare, è necessario soddisfare i requisiti seguenti:
Requisiti dell'area di lavoro:
- Un account Azure Databricks con l'opzione serverless abilitata. Per altre informazioni, vedere Abilitare SQL Warehouse serverless.
- Un'area di lavoro con il catalogo Unity abilitato. Per altre informazioni, vedere Configurare e gestire il catalogo Unity.
Requisiti di calcolo:
È necessario usare uno dei seguenti elementi:
Un'istanza di SQL Warehouse che usa il canale
Current
.Calcolo con modalità di accesso condiviso in Databricks Runtime 13.3 LTS o versione successiva.
Calcolo con modalità di accesso utente singolo in Databricks Runtime 15.4 LTS o versione successiva.
In Databricks Runtime 15.3 e versioni successive non è possibile usare il calcolo utente singolo per eseguire query sulle tabelle di streaming di proprietà di altri utenti. È possibile usare il calcolo con utente singolo in Databricks Runtime 15.3 e versioni successive solo se si è proprietari della tabella di streaming. L'autore della tabella deve esserne proprietario.
Databricks Runtime 15.4 LTS e versioni successive supportano query sulle tabelle generate da Tabelle Live Delta in un singolo calcolo utente, indipendentemente dalla proprietà della tabella. Per sfruttare i vantaggi del filtro dei dati fornito in Databricks Runtime 15.4 LTS e versioni successive, è necessario verificare che l'area di lavoro sia abilitata per il calcolo serverless perché la funzionalità di filtro dei dati che supporta le tabelle generate da Delta Live Tables viene eseguita tramite calcolo serverless. È possibile che vengano addebitati costi per le risorse di calcolo serverless quando si usa il calcolo utente singolo per eseguire operazioni di filtro dei dati. Vedere Controllo di accesso con granularità fine per il calcolo di un singolo utente.
Requisiti di autorizzazione:
- Privilegio
READ FILES
in una posizione esterna del catalogo Unity. Per informazioni, vedere Creare una posizione esterna per connettere la memorizzazione cloud ad Azure Databricks. - Privilegio
USE CATALOG
per il catalogo in cui si crea la tabella di streaming. - Privilegio
USE SCHEMA
per lo schema in cui si crea la tabella di streaming. - Privilegio
CREATE TABLE
per lo schema in cui si crea la tabella di streaming.
Altri requisiti:
Il percorso dei dati di origine.
Esempio di percorso del volume:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
Esempio di percorso esterno:
abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis
Nota
Questo articolo presuppone che i dati da caricare si trovino in un percorso di archiviazione cloud che corrisponde a un volume del catalogo Unity o a un percorso esterno a cui si ha accesso.
Individuare e visualizzare in anteprima i dati di origine
Nella barra laterale dell'area di lavoro cliccare su Query e quindi su Crea query.
Nell'elenco a discesa dell'editor di query selezionare un'istanza di SQL Warehouse che usa il canale
Current
.Incollare il codice seguente nell'editor, sostituendo i valori tra parentesi acute (
<>
) per le informazioni che identificano i dati di origine, quindi cliccare su Esegui.Nota
Se le impostazioni predefinite per la funzione non sono in grado di analizzare i dati, è possibile che si verifichino errori di inferenza dello schema durante l'esecuzione della funzione con valori di tabella
read_files
. Ad esempio, potrebbe essere necessario configurare la modalità a più righe per i file CSV o JSON su più righe. Per un elenco delle opzioni del parser, vedere funzione con valori di tabella read_files./* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
Caricare dati in una tabella di streaming
Per creare una tabella di streaming a partire dai dati nell'archiviazione di oggetti cloud, incollare quanto segue nell'editor di query e quindi fare clic su Esegui:
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')
Aggiornare una tabella di streaming usando una pipeline DLT
Questa sezione descrive i modelli per aggiornare una tabella di streaming con i dati più recenti disponibili dalle origini definite nella query.
Quando si CREATE
o REFRESH
una tabella di streaming, l'aggiornamento elabora usando una pipeline di tabelle live Delta serverless. Ogni tabella di streaming definita ha una pipeline delta live tables associata.
Dopo aver eseguito il comando REFRESH
, viene restituito il collegamento della pipeline DLT. È possibile usare il collegamento della pipeline DLT per controllare lo stato dell'aggiornamento.
Nota
Solo il proprietario della tabella può aggiornare una tabella di streaming per ottenere i dati più recenti. L'utente che crea la tabella è il proprietario e il proprietario non può essere modificato. Potrebbe essere necessario aggiornare la tabella di streaming prima di usare query di spostamento temporale.
Vedere Che cos'è Delta Live Tables?.
Inserire nuovi dati
Per impostazione predefinita, la funzione read_files
legge tutti i dati esistenti nella directory di origine durante la creazione della tabella e quindi elabora i nuovi record in arrivo con ogni aggiornamento.
Per evitare di inserire dati già esistenti nella directory di origine al momento della creazione della tabella, impostare l'opzione includeExistingFiles
su false
. Ciò significa che vengono elaborati solo i dati che arrivano nella directory dopo la creazione della tabella. Ad esempio:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
includeExistingFiles => false)
Aggiornare completamente una tabella di streaming
L'aggiornamento completo rielabora tutti i dati disponibili nell'origine usando la definizione più recente. Non è consigliabile richiamare aggiornamenti completi sulle origini che non mantengono l'intera cronologia dei dati o hanno brevi periodi di conservazione, ad esempio Kafka, perché l'aggiornamento completo tronca i dati esistenti. Potrebbe non essere possibile recuperare i dati obsoleti se i dati non sono più disponibili nell'origine.
Ad esempio:
REFRESH STREAMING TABLE my_bronze_table FULL
Pianificare una tabella di streaming per l'aggiornamento automatico
Per configurare una tabella di streaming per l'aggiornamento automatico in base a una pianificazione definita, incollare quanto segue nell'editor di query e quindi fare clic su Esegui:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
Ad esempio, aggiornare le query di pianificazione; vedere ALTER STREAMING TABLE.
Tenere traccia dello stato di un aggiornamento
È possibile visualizzare lo stato di un aggiornamento della tabella di streaming visualizzando la pipeline che gestisce la tabella di streaming nell'interfaccia utente di Delta Live Tables oppure visualizzando le informazioni di aggiornamento restituite dal comando DESCRIBE EXTENDED
per la tabella di streaming.
DESCRIBE EXTENDED <table-name>
Inserimento in streaming da Kafka
Per un esempio di inserimento in streaming da Kafka, vedere read_kafka.
Concedere agli utenti l'accesso a una tabella di streaming
Per concedere agli utenti il privilegio SELECT
nella tabella di streaming in modo che possano eseguire query, incollare quanto segue nell'editor di query, quindi fare clic su Esegui:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
Per altre informazioni sulla concessione di sugli oggetti a protezione diretta del catalogo Unity, vedere Privilegi e oggetti a protezione diretta del catalogo Unity.