read_kafka funzione con valori di tabella

Si applica a: segno di spunta sì Databricks SQL segno di spunta sì Databricks Runtime 13.3 LTS e versioni successive

Legge i dati da un cluster Apache Kafka e restituisce i dati in formato tabulare.

Può leggere i dati da uno o più argomenti kafka. Supporta sia le query batch che l'inserimento in streaming.

Sintassi

read_kafka([option_key => option_value ] [, ...])

Argomenti

Questa funzione richiede la chiamata di parametri denominati.

  • option_key: nome dell'opzione da configurare. È necessario utilizzare i backtick (') per le opzioni che contengono punti (.).
  • option_value: espressione costante per impostare l'opzione . Accetta valori letterali e funzioni scalari.

Valori restituiti

I record letti da un cluster Apache Kafka con lo schema seguente:

  • key BINARY: chiave del record Kafka.
  • value BINARY NOT NULL: valore del record Kafka.
  • topic STRING NOT NULL: nome dell'argomento Kafka da cui viene letto il record.
  • partition INT NOT NULL: ID della partizione Kafka da cui viene letto il record.
  • offset BIGINT NOT NULL: numero di offset del record in Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: valore timestamp per il record. La timestampType colonna definisce il timestamp a cui corrisponde questo timestamp.
  • timestampType INTEGER NOT NULL: tipo del timestamp specificato nella timestamp colonna.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: valori di intestazione forniti come parte del record (se abilitato).

Esempi

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Opzioni

È possibile trovare un elenco dettagliato delle opzioni nella documentazione di Apache Spark.

Opzioni obbligatorie

Fornire l'opzione seguente per la connessione al cluster Kafka.

Opzione
bootstrapServers

Tipo: String

Elenco delimitato da virgole di coppie host/porta che puntano al cluster Kafka.

Valore predefinito: Nessuno

Fornire solo una delle opzioni seguenti per configurare gli argomenti Kafka da cui eseguire il pull dei dati.

Opzione
assign

Tipo: String

Stringa JSON che contiene le partizioni di argomento specifiche da utilizzare. Ad esempio, per '{"topicA":[0,1],"topicB":[2,4]}'le partizioni 0 e 1° di topicA verranno utilizzate.

Valore predefinito: Nessuno
subscribe

Tipo: String

Elenco delimitato da virgole di argomenti Kafka da cui leggere.

Valore predefinito: Nessuno
subscribePattern

Tipo: String

Argomenti corrispondenti a un'espressione regolare a cui sottoscrivere.

Valore predefinito: Nessuno

Altre opzioni

read_kafka può essere usato nelle query batch e nelle query di streaming. Le opzioni seguenti specificano il tipo di query a cui si applicano.

Opzione
endingOffsets

Tipo: Tipo di query: String solo batch

Offset da leggere fino a quando per una query batch, "latest" specificare i record più recenti o una stringa JSON che specifica un offset finale per ogni TopicPartition. Nel codice JSON, -1 come offset può essere usato per fare riferimento alla versione più recente. -2 (meno recente) come offset non consentito.

Valore predefinito: "latest"
endingOffsetsByTimestamp

Tipo: Tipo di query: String solo batch

Stringa JSON che specifica un timestamp finale da leggere fino a quando non per ogni TopicPartition. I timestamp devono essere forniti come valore lungo del timestamp in millisecondi, ad esempio ,1970-01-01 00:00:00 UTC
1686444353000. Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente .
endingOffsetsByTimestamp ha la precedenza rispetto a endingOffsets.

Valore predefinito: Nessuno
endingTimestamp

Tipo: Tipo di query: String solo batch

Valore stringa del timestamp espresso in millisecondi da
1970-01-01 00:00:00 UTC, ad esempio "1686444353000". Se Kafka non restituisce l'offset corrispondente, l'offset verrà impostato su latest. Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Nota: endingTimestamp ha la precedenza su endingOffsetsByTimestamp e
endingOffsets.

Valore predefinito: Nessuno
includeHeaders

Tipo: Tipo di query: Boolean streaming e batch

Indica se includere le intestazioni Kafka nella riga.

Valore predefinito: false
kafka.<consumer_option>

Tipo: Tipo di query: String streaming e batch

Qualsiasi opzione specifica del consumer Kafka può essere passata con il kafka. prefisso . Queste opzioni devono essere racchiuse da backtick quando specificato. In caso contrario, verrà visualizzato un errore del parser. Le opzioni sono disponibili nella documentazione di Kafka.

Nota: non è consigliabile impostare le opzioni seguenti con questa funzione:
key.deserializer, value.deserializer, bootstrap.serversgroup.id

Valore predefinito: Nessuno
maxOffsetsPerTrigger

Tipo: Tipo di query: Long solo streaming

Limite di velocità per il numero massimo di offset o righe elaborate per intervallo di trigger. Il numero totale specificato di offset verrà suddiviso proporzionalmente tra TopicPartitions.

Valore predefinito: Nessuno
startingOffsets

Tipo: Tipo di query: String streaming e batch

Punto iniziale all'avvio di una query, "earliest" ovvero dagli offset meno recenti, "latest" ovvero dagli offset più recenti o da una stringa JSON che specifica un offset iniziale per ogni TopicPartition. In JSON, -2 come offset può essere usato per fare riferimento alla prima versione più -1 recente.

Nota: per le query batch, la versione più recente (implicitamente o tramite -1 in JSON) non è consentita. Per le query di streaming, questo vale solo quando viene avviata una nuova query. Le query di streaming riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno al più presto.

Valore predefinito: "latest" per lo streaming, "earliest" per batch
startingOffsetsByTimestamp

Tipo: Tipo di query: String streaming e batch

Stringa JSON che specifica un timestamp iniziale per ogni TopicPartition. I timestamp devono essere specificati come valore lungo del timestamp in millisecondi, ad 1970-01-01 00:00:00 UTCesempio 1686444353000. Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Se Kafka non restituisce l'offset corrispondente, il comportamento seguirà il valore dell'opzione startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp ha la precedenza rispetto a startingOffsets.

Nota: per le query di streaming, questa operazione si applica solo all'avvio di una nuova query. Le query di streaming riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno al più presto.

Valore predefinito: Nessuno
startingOffsetsByTimestampStrategy

Tipo: Tipo di query: String streaming e batch

Questa strategia viene usata quando l'offset iniziale specificato per timestamp (globale o per partizione) non corrisponde all'offset restituito da Kafka. Le strategie disponibili sono:

- "error": non è possibile eseguire la query
- "latest": assegna l'offset più recente per queste partizioni in modo che Spark possa leggere i record più recenti da queste partizioni in micro batch successivi.

Valore predefinito: "error"
startingTimestamp

Tipo: Tipo di query: String streaming e batch

Valore stringa del timestamp espresso in millisecondi da
1970-01-01 00:00:00 UTC, ad esempio "1686444353000". Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Se Kafka non restituisce l'offset corrispondente, il comportamento seguirà il valore dell'opzione startingOffsetsByTimestampStrategy.
startingTimestamp ha la precedenza su startingOffsetsByTimestamp e startingOffsets.

Nota: per le query di streaming, questa operazione si applica solo all'avvio di una nuova query. Le query di streaming riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno prima.

Valore predefinito: Nessuno

Nota

L'offset restituito per ogni partizione è l'offset meno recente il cui timestamp è maggiore o uguale al timestamp specificato nella partizione corrispondente. Il comportamento varia in base alle opzioni se Kafka non restituisce l'offset corrispondente. Controllare la descrizione di ogni opzione.

Spark passa semplicemente le informazioni sul timestamp a KafkaConsumer.offsetsForTimese non interpreta o ragiona sul valore. Per altri dettagli su KafkaConsumer.offsetsForTimes, vedere la documentazione. Inoltre, il significato del timestamp qui può variare in base alla configurazione Kafka (log.message.timestamp.type). Per informazioni dettagliate, vedere la documentazione di Apache Kafka.