Função com valor de tabela read_kafka

Aplica-se a:verificação marcada como sim SQL do Databricks verificação marcada como sim Databricks Runtime 13.3 LTS e versões posteriores

Lê dados de um cluster do Apache Kafka e retorna os dados em formato tabular.

Pode ler dados de um ou mais tópicos do Kafka. É compatível com consultas em lote e ingestão de streaming.

Sintaxe

read_kafka([option_key => option_value ] [, ...])

Argumentos

Essa função requer invocação de parâmetro nomeada.

  • option_key: o nome da opção a ser configurada. Você deve usar acento grave (`) para opções que contenham pontos (.).
  • option_value: uma expressão constante para definir a opção. Aceita literais e funções escalares.

Retornos

Registros lidos de um cluster do Apache Kafka com o seguinte esquema:

  • key BINARY: a chave do registro Kafka.
  • value BINARY NOT NULL: o valor do registro Kafka.
  • topic STRING NOT NULL: o nome do tópico Kafka do qual o registro é lido.
  • partition INT NOT NULL: a ID da partição Kafka da qual o registro é lido.
  • offset BIGINT NOT NULL: o número de deslocamento do registro no Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: um valor de carimbo de data/hora para o registro. A coluna timestampType define ao que esse carimbo de data/hora corresponde.
  • timestampType INTEGER NOT NULL: o tipo do carimbo de data/hora especificado na coluna timestamp.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: valores de cabeçalho fornecidos como parte do registro (se habilitado).

Exemplos

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Opções

Você pode encontrar uma lista detalhada de opções na Documentação do Apache Spark.

Opções necessárias

Forneça a opção abaixo para se conectar ao cluster do Kafka.

Opção
bootstrapServers

Digite: String

Uma lista separada por vírgulas de pares de host/porta apontando para o cluster do Kafka.

Valor padrão: nenhum

Forneça apenas uma das opções abaixo para configurar quais tópicos do Kafka serão usados para efetuar pull de dados.

Opção
assign

Digite: String

Uma cadeia de caracteres JSON que contém as partições de tópico específicas a serem consumidas. Por exemplo, para '{"topicA":[0,1],"topicB":[2,4]}', a 0ª e a 1ª partições do tópico serão consumidas.

Valor padrão: nenhum
subscribe

Digite: String

Uma lista separada por vírgulas de tópicos do Kafka para leitura.

Valor padrão: nenhum
subscribePattern

Digite: String

Uma expressão regular que corresponde a tópicos a serem assinados.

Valor padrão: nenhum

Opções diversas

read_kafka podem ser usados nas consultas em lote e nas consultas de streaming. As opções a seguir especificam a que tipo de consulta elas se aplicam.

Opção
endingOffsets

Tipo: String Tipo de Consulta: somente lote

Os deslocamentos a serem lidos até para uma consulta em lote, seja "latest" para especificar os registros mais recentes ou uma cadeia de caracteres JSON especificando um deslocamento final para cada TopicPartition. No JSON, -1 como um deslocamento pode ser usado para se referir ao mais recente. -2 (mais antigo) como um deslocamento não é permitido.

Valor padrão: "latest"
endingOffsetsByTimestamp

Tipo: String Tipo de Consulta: somente lote

Uma cadeia de caracteres JSON que especifica um carimbo de data/hora final para leitura até para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo
1686444353000. Veja a observação abaixo sobre os detalhes do comportamento com carimbos de data/hora.
endingOffsetsByTimestamp tem precedência sobre endingOffsets.

Valor padrão: nenhum
endingTimestamp

Tipo: String Tipo de Consulta: somente lote

Um valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde
1970-01-01 00:00:00 UTC, por exemplo "1686444353000". Se o Kafka não retornar o deslocamento correspondente, o deslocamento será definido como mais recente. Veja a observação abaixo sobre os detalhes do comportamento com carimbos de data/hora. Observação: endingTimestamp tem precedência sobre endingOffsetsByTimestamp e
endingOffsets.

Valor padrão: nenhum
includeHeaders

Tipo: Boolean Tipo de Consulta: streaming e lote

Se os cabeçalhos Kafka devem ser incluídos na linha.

Valor padrão: false
kafka.<consumer_option>

Tipo: String Tipo de Consulta: streaming e lote

Todas as opções específicas do consumidor do Kafka podem ser passadas com o prefixo kafka.. Essas opções precisam ser acompanhadas por acentos graves quando fornecidas; caso contrário, você receberá um erro do analisador. Você pode encontrar as opções na documentação do Kafka.

Observação: você não deve definir as seguintes opções com essa função:
key.deserializer, value.deserializer, bootstrap.servers, group.id

Valor padrão: nenhum
maxOffsetsPerTrigger

Tipo: Long Tipo de Consulta: somente streaming

Limite de taxa do número máximo de deslocamentos ou linhas processadas por intervalo de acionamento. O número total especificado de deslocamentos será dividido proporcionalmente entre TopicPartitions.

Valor padrão: nenhum
startingOffsets

Tipo: String Tipo de Consulta: streaming e lote

O ponto de partida quando uma consulta é iniciada, seja "earliest", que é a partir dos primeiros deslocamentos, ou "latest", que é apenas a partir dos últimos deslocamentos, ou uma cadeia de caracteres JSON especificando um deslocamento inicial para cada TopicPartition. No JSON, -2 como um deslocamento pode ser usado para se referir ao mais antigo, e -1 ao mais recente.

Observação: para as consultas em lote, a opção "mais recente" (implicitamente ou usando -1 em JSON) não é permitida. Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta serão iniciadas na opção “mais antigo”.

Valor padrão: "latest" para streaming, "earliest" para lote
startingOffsetsByTimestamp

Tipo: String Tipo de Consulta: streaming e lote

Uma cadeia de caracteres JSON especificando um carimbo de data/hora inicial para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo 1686444353000. Veja a observação abaixo sobre os detalhes do comportamento com carimbos de data/hora. Se o Kafka não retornar o deslocamento correspondente, o comportamento seguirá para o valor da opção startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp tem precedência sobre startingOffsets.

Observação: para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta serão iniciadas na opção “mais antigo”.

Valor padrão: nenhum
startingOffsetsByTimestampStrategy

Tipo: String Tipo de Consulta: streaming e lote

Essa estratégia é usada quando o deslocamento inicial especificado pelo carimbo de data/hora (global ou por partição) não corresponde ao deslocamento retornado pelo Kafka. As estratégias disponíveis são:

* "error": causa uma falha na consulta
* "latest": atribui o deslocamento mais recente para essas partições para que o Spark possa ler registros mais recentes dessas partições em microlotes posteriores.

Valor padrão: "error"
startingTimestamp

Tipo: String Tipo de Consulta: streaming e lote

Um valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde
1970-01-01 00:00:00 UTC, por exemplo "1686444353000". Veja a observação abaixo sobre os detalhes do comportamento com carimbos de data/hora. Se o Kafka não retornar o deslocamento correspondente, o comportamento seguirá para o valor da opção startingOffsetsByTimestampStrategy.
startingTimestamp tem precedência sobre startingOffsetsByTimestamp e startingOffsets.

Observação: para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas em uma consulta serão iniciadas mais cedo.

Valor padrão: nenhum

Observação

O deslocamento retornado para cada partição é o deslocamento mais antigo cujo carimbo de data/hora é maior ou igual ao carimbo de data/hora fornecido na partição correspondente. O comportamento varia entre as opções se o Kafka não retornar o deslocamento correspondente. Verifique a descrição de cada opção.

O Spark simplesmente passa as informações do carimbo de data/hora para KafkaConsumer.offsetsForTimes e não interpreta nem raciocina sobre o valor. Para obter mais detalhes sobre KafkaConsumer.offsetsForTimes, confira a documentação. Além disso, o significado do carimbo de data/hora pode variar de acordo com a configuração do Kafka (log.message.timestamp.type). Para obter detalhes, confira a Documentação do Apache Kafka.