read_pulsar
funzione con valori di tabella di streaming
Si applica a: Databricks SQL Databricks Runtime 14.1 e versioni successive
Importante
Questa funzionalità è disponibile in anteprima pubblica.
Restituisce una tabella con record letti da Pulsar.
Questa funzione con valori di tabella supporta solo lo streaming e non la query batch.
Sintassi
read_pulsar ( { option_key => option_value } [, ...] )
Argomenti
Questa funzione richiede la chiamata di parametri denominati per le chiavi di opzione.
Le opzioni serviceUrl
e topic
sono obbligatorie.
Le descrizioni degli argomenti sono brevi qui. Per le descrizioni estese, vedere la documentazione di Pulsar di streaming strutturato.
Opzione | Type | Default | Descrizione |
---|---|---|---|
serviceUrl | STRING | Obbligatorio | URI del servizio Pulsar. |
argomento | STRING | Obbligatorio | Argomento da cui leggere. |
predefinedSubscription | STRING | None | Nome di sottoscrizione predefinito usato dal connettore per tenere traccia dello stato dell'applicazione Spark. |
subscriptionPrefix | STRING | None | Prefisso usato dal connettore per generare una sottoscrizione casuale per tenere traccia dello stato dell'applicazione Spark. |
pollTimeoutMs | LONG | 120000 | Timeout per la lettura dei messaggi da Pulsar in millisecondi. |
failOnDataLoss | BOOLEAN | true | Controlla se non eseguire una query quando i dati vengono persi( ad esempio, gli argomenti vengono eliminati o i messaggi vengono eliminati a causa dei criteri di conservazione). |
startingOffsets | STRING | più recente | Punto iniziale all'avvio di una query, prima, più recente o stringa JSON che specifica un offset specifico. Se più recente, il lettore legge i record più recenti dopo l'avvio dell'esecuzione. Se meno recente, il lettore legge dall'offset meno recente. L'utente può anche specificare una stringa JSON che specifica un offset specifico. |
startingTime | STRING | None | Se specificato, l'origine Pulsar leggerà i messaggi a partire dalla posizione del valore startingTime specificato. |
Per l'autenticazione del client pulsar vengono usati gli argomenti seguenti:
Opzione | Type | Default | Descrizione |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | None | Nome del plug-in di autenticazione. |
pulsarClientAuthParams | STRING | None | Parametri per il plug-in di autenticazione. |
pulsarClientUseKeyStoreTls | STRING | None | Indica se usare KeyStore per l'autenticazione tls. |
pulsarClientTlsTrustStoreType | STRING | None | Tipo di file TrustStore per l'autenticazione tls. |
pulsarClientTlsTrustStorePath | STRING | None | Percorso del file TrustStore per l'autenticazione tls. |
pulsarClientTlsTrustStorePassword | STRING | None | Password trustStore per l'autenticazione tls. |
Questi argomenti vengono usati per la configurazione e l'autenticazione del controllo di ammissione pulsar, la configurazione dell'amministratore pulsar è necessaria solo quando il controllo di ammissione è abilitato (quando è impostato maxBytesPerTrigger)
Opzione | Type | Default | Descrizione |
---|---|---|---|
maxBytesPerTrigger | bigint | None | Limite flessibile del numero massimo di byte da elaborare per microbatch. Se questa opzione è specificata, è necessario specificare anche admin.url. |
adminUrl | STRING | None | Configurazione del servizio PulsarHttpUrl. È necessario solo quando viene specificato maxBytesPerTrigger. |
pulsarAdminAuthPlugin | STRING | None | Nome del plug-in di autenticazione. |
pulsarAdminAuthParams | STRING | None | Parametri per il plug-in di autenticazione. |
pulsarClientUseKeyStoreTls | STRING | None | Indica se usare KeyStore per l'autenticazione tls. |
pulsarAdminTlsTrustStoreType | STRING | None | Tipo di file TrustStore per l'autenticazione tls. |
pulsarAdminTlsTrustStorePath | STRING | None | Percorso del file TrustStore per l'autenticazione tls. |
pulsarAdminTlsTrustStorePassword | STRING | None | Password trustStore per l'autenticazione tls. |
Valori restituiti
Tabella di record pulsar con lo schema seguente.
__key STRING NOT NULL
: chiave del messaggio Pulsar.value BINARY NOT NULL
: valore del messaggio Pulsar.Nota: per gli argomenti con lo schema Avro o JSON, invece di caricare il contenuto in un campo valore binario, il contenuto verrà espanso per mantenere i nomi dei campi e i tipi di campo dell'argomento Pulsar.
__topic STRING NOT NULL
: nome dell'argomento Pulsar.__messageId BINARY NOT NULL
: ID messaggio Pulsar.__publishTime TIMESTAMP NOT NULL
: tempo di pubblicazione del messaggio Pulsar.__eventTime TIMESTAMP NOT NULL
: ora dell'evento del messaggio Pulsar.__messageProperties MAP<STRING, STRING>
: proprietà del messaggio Pulsar.
Esempi
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.