função de streaming com valor de tabela read_pubsub
Aplica-se a: SQL do Databricks Databricks Runtime 13.3 LTS e versões posteriores
Retorna uma tabela com registros lidos do Pub/Sub de um tópico. Só é compatível com consultas de streaming.
Sintaxe
read_pubsub( { parameter => value } [, ...])
Argumentos
read_pubsub
requer invocação de parâmetro nomeado.
Os únicos argumentos necessários são subscriptionId
, projectId
e topicId
. Todos os outros argumentos são opcionais.
Para descrições completas dos argumentos, veja Configurar opções para de leitura de streaming do Pub/Sub.
O Databricks recomenda o uso de segredos ao fornecer opções de autorização. Confira função secreta.
Para mais detalhes sobre a configuração do acesso ao Pub/Sub, confira Configurar o acesso ao Pub/Sub.
Parâmetro | Tipo | Descrição |
---|---|---|
subscriptionId |
STRING |
Obrigatório, o identificador exclusivo atribuído a uma assinatura do Pub/Sub. |
projectId |
STRING |
Obrigatório, a ID do projeto do Google Cloud associada ao tópico do Pub/Sub. |
topicId |
STRING |
Obrigatório, a ID ou o nome do tópico do Pub/Sub para se inscrever. |
clientEmail |
STRING |
O endereço de email associado a uma conta de serviço para autenticação. |
clientId |
STRING |
A ID do cliente associada à conta de serviço para autenticação. |
privateKeyId |
STRING |
A ID da chave privada associada à conta de serviço. |
privateKey |
STRING |
A chave privada associada à conta de serviço para autenticação. |
Esses argumentos são usados para ajustes adicionais ao ler do Pub/Sub:
Parâmetro | Tipo | Descrição |
---|---|---|
numFetchPartitions |
STRING |
Opcional com número padrão de executores. O número de tarefas paralelas do Spark que buscam registros de uma assinatura. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Opcional com padrão false . Se definido como verdadeiro, a assinatura passada para o stream será excluída quando o trabalho de streaming terminar. |
maxBytesPerTrigger |
STRING |
Um limite flexível para o tamanho do lote a ser processado durante cada microlote disparado. O padrão é "none". |
maxRecordsPerFetch |
STRING |
O número de registros a serem buscados por tarefa antes do processamento de registros. O padrão é "1000". |
maxFetchPeriod |
STRING |
A duração do tempo para cada tarefa a ser buscada antes do processamento de registros. O padrão é ’’10s’’. |
Retornos
Uma tabela de registros do Pub/Sub com o seguinte esquema. A coluna de atributos pode ser nula, mas todas as outras colunas não são nulas.
Nome | Tipo de dados | Nullable | Standard | Descrição |
---|---|---|---|---|
messageId |
STRING |
Não | Identificador único para a mensagem do Pub/Sub. | |
payload |
BINARY |
Não | O conteúdo da mensagem do Pub/Sub. | |
attributes |
STRING |
Sim | Pares chave-valor representando os atributos da mensagem do Pub/Sub. Esta é uma string codificada em json. | |
publishTimestampInMillis |
BIGINT |
Não | O carimbo de data/hora de quando a mensagem foi publicada, em milissegundos. | |
sequenceNumber |
BIGINT |
Não | O identificador exclusivo do registro dentro de seu fragmento. |
Exemplos
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
Agora, os dados precisariam ser consultados da testing.streaming_table
para análises adicionais.
Consultas errôneas:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);