Función con valores de tabla de transmisión por secuencias read_pubsub
Se aplica a: Databricks SQL Databricks Runtime 13.3 LTS y versiones posteriores
Devuelve una tabla con registros leídos de Pub/Sub de un tema. Solo admite consultas de streaming.
Sintaxis
read_pubsub( { parameter => value } [, ...])
Argumentos
read_pubsub
requiere una invocación de parámetros con nombre.
Los únicos argumentos necesarios son subscriptionId
, projectId
, y topicId
. Todos los demás argumentos son opcionales.
Para obtener descripciones de argumentos completas, vea Opciones de configuración para la lectura de secuencias pub/sub.
Databricks recomienda usar secretos al proporcionar opciones de autorización. Vea función secreta.
Para obtener más información sobre cómo configurar el acceso a Pub/Sub, vea Configurar el acceso a Pub/Sub.
Parámetro | Tipo | Descripción |
---|---|---|
subscriptionId |
STRING |
Obligatorio, el identificador único asignado a una suscripción de Pub/Sub. |
projectId |
STRING |
Obligatorio, el identificador de proyecto de Google Cloud asociado al tema Pub/Sub. |
topicId |
STRING |
Obligatorio, el identificador o el nombre del tema Pub/Sub al que suscribirse. |
clientEmail |
STRING |
Dirección de correo electrónico asociada a una cuenta de servicio para la autenticación. |
clientId |
STRING |
Identificador de cliente asociado a la cuenta de servicio para la autenticación. |
privateKeyId |
STRING |
Identificador de la clave privada asociada a la cuenta de servicio. |
privateKey |
STRING |
Clave privada asociada a la cuenta de servicio para la autenticación. |
Estos argumentos se usan para un ajuste más preciso al leer desde Pub/Sub:
Parámetro | Tipo | Descripción |
---|---|---|
numFetchPartitions |
STRING |
Opcional con el número predeterminado de ejecutores. Número de tareas paralelas de Spark que capturan registros de una suscripción. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Opcional con predeterminadfalse . Si se establece en true, la suscripción pasada a la secuencia se elimina cuando finaliza el trabajo de streaming. |
maxBytesPerTrigger |
STRING |
Límite flexible para el tamaño del lote que se va a procesar durante cada microlote desencadenado. El valor predeterminado no es ‘ninguno’. |
maxRecordsPerFetch |
STRING |
Número de registros que se van a capturar por tarea antes de procesar los registros. El valor predeterminado es ‘1000’. |
maxFetchPeriod |
STRING |
Duración de tiempo para cada tarea que se va a capturar antes de procesar los registros. El valor predeterminado es "10 s". |
Devoluciones
Tabla de registros Pub/Sub con el esquema siguiente. La columna de atributos podría ser null, pero todas las demás columnas no son null.
Nombre | Tipo de datos | Nullable | Estándar | Descripción |
---|---|---|---|---|
messageId |
STRING |
No | Identificador único del mensaje Pub/Sub. | |
payload |
BINARY |
No | Contenido del mensaje Pub/Sub. | |
attributes |
STRING |
Sí | Pares clave-valor que representan los atributos del mensaje Pub/Sub. Se trata de una cadena codificada en json. | |
publishTimestampInMillis |
BIGINT |
No | Marca de tiempo cuando se publicó el mensaje, en milisegundos. | |
sequenceNumber |
BIGINT |
No | Identificador único del registro dentro de su partición. |
Ejemplos
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
Los datos ahora deben consultarse desde el testing.streaming_table
para realizar un análisis posterior.
Consultas erróneas:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);