read_pulsar streamovaná funkce s hodnotou tabulky

Platí pro: zaškrtnutí označeného ano Databricks SQL zaškrtnutí označeného ano Databricks Runtime 14.1 a vyšší

Důležité

Tato funkce je ve verzi Public Preview.

Vrátí tabulku se záznamy načtenými z Pulsaru.

Tato funkce s hodnotou tabulky podporuje pouze streamování, nikoli dávkové dotazy.

Syntaxe

read_pulsar ( { option_key => option_value } [, ...] )

Argumenty

Tato funkce vyžaduje vyvolání pojmenovaného parametru pro klíče možností.

serviceUrl Možnosti a topic jsou povinné.

Popisy argumentů jsou zde stručné. Další popisy najdete v dokumentaci ke strukturovanému streamování Pulsear .

Možnost Typ Výchozí Popis
serviceUrl STRING Povinné Identifikátor URI služby Pulsear.
topic STRING Povinné Téma, ze které se má číst.
předdefinovaná podsítě STRING Nic Předdefinovaný název předplatného používaný konektorem ke sledování průběhu aplikace Spark.
subscriptionPrefix STRING Nic Předpona používaná konektorem ke generování náhodného předplatného ke sledování průběhu aplikace Spark.
pollTimeoutMs DLOUHÝ 120000 Časový limit pro čtení zpráv z Pulsaru v milisekundách.
failOnDataLoss BOOLEOVSKÝ true Určuje, jestli se dotaz nezdaří, když dojde ke ztrátě dat (například témata se odstraní nebo zprávy se odstraní kvůli zásadám uchovávání informací).
startingOffsets STRING nejnovější Počáteční bod při spuštění dotazu, buď nejstaršího, nejnovějšího, nebo řetězce JSON, který určuje konkrétní posun. Pokud je nejnovější, přečte si čtenář nejnovější záznamy po spuštění. Pokud je nejstarší, čtečka čte od nejstaršího posunu. Uživatel může také zadat řetězec JSON, který určuje konkrétní posun.
počáteční čas STRING Nic Při zadání bude zdroj Pulsear číst zprávy od pozice zadaného počátečního času.

Pro ověřování pulzárního klienta se používají následující argumenty:

Možnost Typ Výchozí Popis
pulsearClientAuthPluginClassName STRING Nic Název ověřovacího modulu plug-in.
pulsearClientAuthParams STRING Nic Parametry ověřovacího modulu plug-in.
pulsearClientUseKeyStoreTls STRING Nic Určuje, jestli se má úložiště klíčů používat pro ověřování tls.
pulsearClientTlsTrustStoreType STRING Nic Typ souboru TrustStore pro ověřování tls.
pulsearClientTlsTrustStorePath STRING Nic Cesta k souboru TrustStore pro ověřování tls.
pulsearClientTlsTrustStorePassword STRING Nic Heslo TrustStore pro ověřování tls

Tyto argumenty se používají pro konfiguraci a ověřování řízení pulzárního přístupu, konfigurace pulsar admin se vyžaduje pouze v případě, že je povoleno řízení přístupu (pokud je nastaven maxBytesPerTrigger).

Možnost Typ Výchozí Popis
maxBytesPerTrigger BIGINT Nic Měkký limit maximálního počtu bajtů, které chceme zpracovat na mikrobatch. Pokud je zadáno, musí být zadána také adresa admin.url.
adminUrl STRING Nic Konfigurace Pulsear serviceHttpUrl. Vyžaduje se pouze v případě, že je zadán maxBytesPerTrigger.
pulsearAdminAuthPlugin STRING Nic Název ověřovacího modulu plug-in.
pulsearAdminAuthParams STRING Nic Parametry ověřovacího modulu plug-in.
pulsearClientUseKeyStoreTls STRING Nic Určuje, jestli se má úložiště klíčů používat pro ověřování tls.
pulsearAdminTlsTrustStoreType STRING Nic Typ souboru TrustStore pro ověřování tls.
pulsearAdminTlsTrustStorePath STRING Nic Cesta k souboru TrustStore pro ověřování tls.
pulsearAdminTlsTrustStorePassword STRING Nic Heslo TrustStore pro ověřování tls

Návraty

Tabulka pulsárních záznamů s následujícím schématem.

  • __key STRING NOT NULL: Pulsar message key.

  • value BINARY NOT NULL: Hodnota pulzární zprávy.

    Poznámka: Pro témata se schématem Avro nebo JSON se místo načítání obsahu do pole binární hodnoty obsah rozšíří, aby se zachovaly názvy polí a typy polí tématu Pulsear.

  • __topic STRING NOT NULL: Pulsar topic name.

  • __messageId BINARY NOT NULL: Pulsar message id.

  • __publishTime TIMESTAMP NOT NULL: Pulsar message publish time.

  • __eventTime TIMESTAMP NOT NULL: Čas události pulzární zprávy.

  • __messageProperties MAP<STRING, STRING>: Vlastnosti pulzární zprávy.

Příklady

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.