read_pulsar
streamovaná funkce s hodnotou tabulky
Platí pro: Databricks SQL Databricks Runtime 14.1 a vyšší
Důležité
Tato funkce je ve verzi Public Preview.
Vrátí tabulku se záznamy načtenými z Pulsaru.
Tato funkce s hodnotou tabulky podporuje pouze streamování, nikoli dávkové dotazy.
Syntaxe
read_pulsar ( { option_key => option_value } [, ...] )
Argumenty
Tato funkce vyžaduje vyvolání pojmenovaného parametru pro klíče možností.
serviceUrl
Možnosti a topic
jsou povinné.
Popisy argumentů jsou zde stručné. Další popisy najdete v dokumentaci ke strukturovanému streamování Pulsear .
Možnost | Typ | Výchozí | Popis |
---|---|---|---|
serviceUrl | STRING | Povinné | Identifikátor URI služby Pulsear. |
topic | STRING | Povinné | Téma, ze které se má číst. |
předdefinovaná podsítě | STRING | Nic | Předdefinovaný název předplatného používaný konektorem ke sledování průběhu aplikace Spark. |
subscriptionPrefix | STRING | Nic | Předpona používaná konektorem ke generování náhodného předplatného ke sledování průběhu aplikace Spark. |
pollTimeoutMs | DLOUHÝ | 120000 | Časový limit pro čtení zpráv z Pulsaru v milisekundách. |
failOnDataLoss | BOOLEOVSKÝ | true | Určuje, jestli se dotaz nezdaří, když dojde ke ztrátě dat (například témata se odstraní nebo zprávy se odstraní kvůli zásadám uchovávání informací). |
startingOffsets | STRING | nejnovější | Počáteční bod při spuštění dotazu, buď nejstaršího, nejnovějšího, nebo řetězce JSON, který určuje konkrétní posun. Pokud je nejnovější, přečte si čtenář nejnovější záznamy po spuštění. Pokud je nejstarší, čtečka čte od nejstaršího posunu. Uživatel může také zadat řetězec JSON, který určuje konkrétní posun. |
počáteční čas | STRING | Nic | Při zadání bude zdroj Pulsear číst zprávy od pozice zadaného počátečního času. |
Pro ověřování pulzárního klienta se používají následující argumenty:
Možnost | Typ | Výchozí | Popis |
---|---|---|---|
pulsearClientAuthPluginClassName | STRING | Nic | Název ověřovacího modulu plug-in. |
pulsearClientAuthParams | STRING | Nic | Parametry ověřovacího modulu plug-in. |
pulsearClientUseKeyStoreTls | STRING | Nic | Určuje, jestli se má úložiště klíčů používat pro ověřování tls. |
pulsearClientTlsTrustStoreType | STRING | Nic | Typ souboru TrustStore pro ověřování tls. |
pulsearClientTlsTrustStorePath | STRING | Nic | Cesta k souboru TrustStore pro ověřování tls. |
pulsearClientTlsTrustStorePassword | STRING | Nic | Heslo TrustStore pro ověřování tls |
Tyto argumenty se používají pro konfiguraci a ověřování řízení pulzárního přístupu, konfigurace pulsar admin se vyžaduje pouze v případě, že je povoleno řízení přístupu (pokud je nastaven maxBytesPerTrigger).
Možnost | Typ | Výchozí | Popis |
---|---|---|---|
maxBytesPerTrigger | BIGINT | Nic | Měkký limit maximálního počtu bajtů, které chceme zpracovat na mikrobatch. Pokud je zadáno, musí být zadána také adresa admin.url. |
adminUrl | STRING | Nic | Konfigurace Pulsear serviceHttpUrl. Vyžaduje se pouze v případě, že je zadán maxBytesPerTrigger. |
pulsearAdminAuthPlugin | STRING | Nic | Název ověřovacího modulu plug-in. |
pulsearAdminAuthParams | STRING | Nic | Parametry ověřovacího modulu plug-in. |
pulsearClientUseKeyStoreTls | STRING | Nic | Určuje, jestli se má úložiště klíčů používat pro ověřování tls. |
pulsearAdminTlsTrustStoreType | STRING | Nic | Typ souboru TrustStore pro ověřování tls. |
pulsearAdminTlsTrustStorePath | STRING | Nic | Cesta k souboru TrustStore pro ověřování tls. |
pulsearAdminTlsTrustStorePassword | STRING | Nic | Heslo TrustStore pro ověřování tls |
Návraty
Tabulka pulsárních záznamů s následujícím schématem.
__key STRING NOT NULL
: Pulsar message key.value BINARY NOT NULL
: Hodnota pulzární zprávy.Poznámka: Pro témata se schématem Avro nebo JSON se místo načítání obsahu do pole binární hodnoty obsah rozšíří, aby se zachovaly názvy polí a typy polí tématu Pulsear.
__topic STRING NOT NULL
: Pulsar topic name.__messageId BINARY NOT NULL
: Pulsar message id.__publishTime TIMESTAMP NOT NULL
: Pulsar message publish time.__eventTime TIMESTAMP NOT NULL
: Čas události pulzární zprávy.__messageProperties MAP<STRING, STRING>
: Vlastnosti pulzární zprávy.
Příklady
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.