Função com valor de tabela read_kafka
Aplica-se a: SQL do Databricks Databricks Runtime 13.3 LTS e versões posteriores
Lê dados de um cluster do Apache Kafka e retorna os dados em formato tabular.
Pode ler dados de um ou mais tópicos do Kafka. É compatível com consultas em lote e ingestão de streaming.
Sintaxe
read_kafka([option_key => option_value ] [, ...])
Argumentos
Essa função requer invocação de parâmetro nomeada.
option_key
: o nome da opção a ser configurada. Você deve usar acento grave (`) para opções que contenham pontos (.
).option_value
: uma expressão constante para definir a opção. Aceita literais e funções escalares.
Retornos
Registros lidos de um cluster do Apache Kafka com o seguinte esquema:
key BINARY
: a chave do registro Kafka.value BINARY NOT NULL
: o valor do registro Kafka.topic STRING NOT NULL
: o nome do tópico Kafka do qual o registro é lido.partition INT NOT NULL
: a ID da partição Kafka da qual o registro é lido.offset BIGINT NOT NULL
: o número de deslocamento do registro no KafkaTopicPartition
.timestamp TIMESTAMP NOT NULL
: um valor de carimbo de data/hora para o registro. A colunatimestampType
define ao que esse carimbo de data/hora corresponde.timestampType INTEGER NOT NULL
: o tipo do carimbo de data/hora especificado na colunatimestamp
.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: valores de cabeçalho fornecidos como parte do registro (se habilitado).
Exemplos
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opções
Você pode encontrar uma lista detalhada de opções na Documentação do Apache Spark.
Opções necessárias
Forneça a opção abaixo para se conectar ao cluster do Kafka.
Opção |
---|
bootstrapServers Digite: String Uma lista separada por vírgulas de pares de host/porta apontando para o cluster do Kafka. Valor padrão: nenhum |
Forneça apenas uma das opções abaixo para configurar quais tópicos do Kafka serão usados para efetuar pull de dados.
Opção |
---|
assign Digite: String Uma cadeia de caracteres JSON que contém as partições de tópico específicas a serem consumidas. Por exemplo, para '{"topicA":[0,1],"topicB":[2,4]}' , a 0ª e a 1ª partições do tópico serão consumidas.Valor padrão: nenhum |
subscribe Digite: String Uma lista separada por vírgulas de tópicos do Kafka para leitura. Valor padrão: nenhum |
subscribePattern Digite: String Uma expressão regular que corresponde a tópicos a serem assinados. Valor padrão: nenhum |
Opções diversas
read_kafka
podem ser usados nas consultas em lote e nas consultas de streaming. As opções a seguir especificam a que tipo de consulta elas se aplicam.
Opção |
---|
endingOffsets Tipo: String Tipo de Consulta: somente loteOs deslocamentos a serem lidos até para uma consulta em lote, seja "latest" para especificar os registros mais recentes ou uma cadeia de caracteres JSON especificando um deslocamento final para cada TopicPartition. No JSON, -1 como um deslocamento pode ser usado para se referir ao mais recente. -2 (mais antigo) como um deslocamento não é permitido.Valor padrão: "latest" |
endingOffsetsByTimestamp Tipo: String Tipo de Consulta: somente loteUma cadeia de caracteres JSON que especifica um carimbo de data/hora final para leitura até para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC , por exemplo1686444353000 . Veja a observação abaixo sobre os detalhes do comportamento com carimbos de data/hora.endingOffsetsByTimestamp tem precedência sobre endingOffsets .Valor padrão: nenhum |
endingTimestamp Tipo: String Tipo de Consulta: somente loteUm valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC , por exemplo "1686444353000" . Se o Kafka não retornar o deslocamento correspondente, o deslocamento será definido como mais recente. Veja a observação abaixo sobre os detalhes do comportamento com carimbos de data/hora. Observação: endingTimestamp tem precedência sobre endingOffsetsByTimestamp eendingOffsets .Valor padrão: nenhum |
includeHeaders Tipo: Boolean Tipo de Consulta: streaming e loteSe os cabeçalhos Kafka devem ser incluídos na linha. Valor padrão: false |
kafka.<consumer_option> Tipo: String Tipo de Consulta: streaming e loteTodas as opções específicas do consumidor do Kafka podem ser passadas com o prefixo kafka. . Essas opções precisam ser acompanhadas por acentos graves quando fornecidas; caso contrário, você receberá um erro do analisador. Você pode encontrar as opções na documentação do Kafka.Observação: você não deve definir as seguintes opções com essa função: key.deserializer , value.deserializer , bootstrap.servers , group.id Valor padrão: nenhum |
maxOffsetsPerTrigger Tipo: Long Tipo de Consulta: somente streamingLimite de taxa do número máximo de deslocamentos ou linhas processadas por intervalo de acionamento. O número total especificado de deslocamentos será dividido proporcionalmente entre TopicPartitions. Valor padrão: nenhum |
startingOffsets Tipo: String Tipo de Consulta: streaming e loteO ponto de partida quando uma consulta é iniciada, seja "earliest" , que é a partir dos primeiros deslocamentos, ou "latest" , que é apenas a partir dos últimos deslocamentos, ou uma cadeia de caracteres JSON especificando um deslocamento inicial para cada TopicPartition. No JSON, -2 como um deslocamento pode ser usado para se referir ao mais antigo, e -1 ao mais recente.Observação: para as consultas em lote, a opção "mais recente" (implicitamente ou usando -1 em JSON) não é permitida. Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta serão iniciadas na opção “mais antigo”. Valor padrão: "latest" para streaming, "earliest" para lote |
startingOffsetsByTimestamp Tipo: String Tipo de Consulta: streaming e loteUma cadeia de caracteres JSON especificando um carimbo de data/hora inicial para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC , por exemplo 1686444353000 . Veja a observação abaixo sobre os detalhes do comportamento com carimbos de data/hora. Se o Kafka não retornar o deslocamento correspondente, o comportamento seguirá para o valor da opção startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp tem precedência sobre startingOffsets .Observação: para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta serão iniciadas na opção “mais antigo”. Valor padrão: nenhum |
startingOffsetsByTimestampStrategy Tipo: String Tipo de Consulta: streaming e loteEssa estratégia é usada quando o deslocamento inicial especificado pelo carimbo de data/hora (global ou por partição) não corresponde ao deslocamento retornado pelo Kafka. As estratégias disponíveis são: - "error" : causa uma falha na consulta- "latest" : atribui o deslocamento mais recente para essas partições para que o Spark possa ler registros mais recentes dessas partições em microlotes posteriores.Valor padrão: "error" |
startingTimestamp Tipo: String Tipo de Consulta: streaming e loteUm valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC , por exemplo "1686444353000" . Veja a observação abaixo sobre os detalhes do comportamento com carimbos de data/hora. Se o Kafka não retornar o deslocamento correspondente, o comportamento seguirá para o valor da opção startingOffsetsByTimestampStrategy .startingTimestamp tem precedência sobre startingOffsetsByTimestamp e startingOffsets .Observação: para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas em uma consulta serão iniciadas mais cedo. Valor padrão: nenhum |
Observação
O deslocamento retornado para cada partição é o deslocamento mais antigo cujo carimbo de data/hora é maior ou igual ao carimbo de data/hora fornecido na partição correspondente. O comportamento varia entre as opções se o Kafka não retornar o deslocamento correspondente. Verifique a descrição de cada opção.
O Spark simplesmente passa as informações do carimbo de data/hora para KafkaConsumer.offsetsForTimes
e não interpreta nem raciocina sobre o valor. Para obter mais detalhes sobre KafkaConsumer.offsetsForTimes
, confira a documentação. Além disso, o significado do carimbo de data/hora pode variar de acordo com a configuração do Kafka (log.message.timestamp.type
). Para obter detalhes, confira a Documentação do Apache Kafka.