read_pubsub
streamovaná funkce s hodnotou tabulky
Platí pro: Databricks SQL Databricks Runtime 13.3 LTS a vyšší
Vrátí tabulku se záznamy načtenými z pub/Sub z tématu. Podporuje pouze dotazy streamování.
Syntaxe
read_pubsub( { parameter => value } [, ...])
Argumenty
read_pubsub
vyžaduje vyvolání pojmenovaného parametru.
Jedinými povinnými argumenty jsou subscriptionId
, projectId
a topicId
. Všechny ostatní argumenty jsou volitelné.
Úplný popis argumentu najdete v tématu Konfigurace možností čtení pub/sub streamingu.
Databricks doporučuje používat tajné kódy při poskytování možností autorizace. Viz funkce tajného kódu.
Podrobnosti o konfiguraci přístupu k pub/sub naleznete v tématu Konfigurace přístupu k Pub/Sub.
Parametr | Typ | Popis |
---|---|---|
subscriptionId |
STRING |
Povinný identifikátor přiřazený k předplatnému Pub/Sub. |
projectId |
STRING |
Povinné je ID projektu Google Cloud přidružené k tématu Pub/Sub. |
topicId |
STRING |
Povinné, ID nebo název tématu Pub/Sub pro přihlášení k odběru. |
clientEmail |
STRING |
E-mailová adresa přidružená k účtu služby pro ověřování. |
clientId |
STRING |
ID klienta přidružené k účtu služby pro ověřování. |
privateKeyId |
STRING |
ID privátního klíče přidruženého k účtu služby. |
privateKey |
STRING |
Privátní klíč přidružený k účtu služby pro ověřování. |
Tyto argumenty se používají k dalšímu vyladění při čtení z pub/sub:
Parametr | Typ | Popis |
---|---|---|
numFetchPartitions |
STRING |
Volitelné s výchozím počtem exekutorů. Početparalelních |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Volitelné s výchozím nastavením false . Pokud je nastavená hodnota true, odběr předaný streamu se po skončení úlohy streamování odstraní. |
maxBytesPerTrigger |
STRING |
Měkký limit velikosti dávky, která se má zpracovat během každé aktivované mikrodávkové dávky. Výchozí hodnota je žádná. |
maxRecordsPerFetch |
STRING |
Počet záznamů, které se mají načíst na každou úlohu před zpracováním záznamů. Výchozí hodnota je 1000. |
maxFetchPeriod |
STRING |
Doba trvání každého úkolu, která se má načíst před zpracováním záznamů. Výchozí hodnota je 10s. |
Návraty
Tabulka záznamů Pub/Sub s následujícím schématem. Sloupec atributů může mít hodnotu null, ale všechny ostatní sloupce nemají hodnotu null.
Name | Datový typ | Vynulovatelné | Standard | Popis |
---|---|---|---|---|
messageId |
STRING |
No | Jedinečný identifikátor zprávy Pub/Sub. | |
payload |
BINARY |
No | Obsah zprávy Pub/Sub | |
attributes |
STRING |
Ano | Páry klíč-hodnota představující atributy zprávy Pub/Sub Jedná se o řetězec kódovaný ve formátu JSON. | |
publishTimestampInMillis |
BIGINT |
No | Časové razítko při publikování zprávy v milisekundách. | |
sequenceNumber |
BIGINT |
No | Jedinečný identifikátor záznamu v rámci jeho horizontálního oddílu. |
Příklady
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
Data by se teď museli dotazovat z testing.streaming_table
další analýzy.
Chybné dotazy:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);