read_kafka
funzione con valori di tabella
Si applica a: Databricks SQL Databricks Runtime 13.3 LTS e versioni successive
Legge i dati da un cluster Apache Kafka e restituisce i dati in formato tabulare.
Può leggere i dati da uno o più argomenti kafka. Supporta sia le query batch che l'inserimento in streaming.
Sintassi
read_kafka([option_key => option_value ] [, ...])
Argomenti
Questa funzione richiede la chiamata di parametri denominati.
option_key
: nome dell'opzione da configurare. È necessario utilizzare i backtick (') per le opzioni che contengono punti (.
).option_value
: espressione costante per impostare l'opzione . Accetta valori letterali e funzioni scalari.
Valori restituiti
I record letti da un cluster Apache Kafka con lo schema seguente:
key BINARY
: chiave del record Kafka.value BINARY NOT NULL
: valore del record Kafka.topic STRING NOT NULL
: nome dell'argomento Kafka da cui viene letto il record.partition INT NOT NULL
: ID della partizione Kafka da cui viene letto il record.offset BIGINT NOT NULL
: numero di offset del record in KafkaTopicPartition
.timestamp TIMESTAMP NOT NULL
: valore timestamp per il record. LatimestampType
colonna definisce il timestamp a cui corrisponde questo timestamp.timestampType INTEGER NOT NULL
: tipo del timestamp specificato nellatimestamp
colonna.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: valori di intestazione forniti come parte del record (se abilitato).
Esempi
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opzioni
È possibile trovare un elenco dettagliato delle opzioni nella documentazione di Apache Spark.
Opzioni obbligatorie
Fornire l'opzione seguente per la connessione al cluster Kafka.
Opzione |
---|
bootstrapServers Tipo: String Elenco delimitato da virgole di coppie host/porta che puntano al cluster Kafka. Valore predefinito: Nessuno |
Fornire solo una delle opzioni seguenti per configurare gli argomenti Kafka da cui eseguire il pull dei dati.
Opzione |
---|
assign Tipo: String Stringa JSON che contiene le partizioni di argomento specifiche da utilizzare. Ad esempio, per '{"topicA":[0,1],"topicB":[2,4]}' le partizioni 0 e 1° di topicA verranno utilizzate.Valore predefinito: Nessuno |
subscribe Tipo: String Elenco delimitato da virgole di argomenti Kafka da cui leggere. Valore predefinito: Nessuno |
subscribePattern Tipo: String Argomenti corrispondenti a un'espressione regolare a cui sottoscrivere. Valore predefinito: Nessuno |
Altre opzioni
read_kafka
può essere usato nelle query batch e nelle query di streaming. Le opzioni seguenti specificano il tipo di query a cui si applicano.
Opzione |
---|
endingOffsets Tipo: Tipo di query: String solo batchOffset da leggere fino a quando per una query batch, "latest" specificare i record più recenti o una stringa JSON che specifica un offset finale per ogni TopicPartition. Nel codice JSON, -1 come offset può essere usato per fare riferimento alla versione più recente. -2 (meno recente) come offset non consentito.Valore predefinito: "latest" |
endingOffsetsByTimestamp Tipo: Tipo di query: String solo batchStringa JSON che specifica un timestamp finale da leggere fino a quando non per ogni TopicPartition. I timestamp devono essere forniti come valore lungo del timestamp in millisecondi, ad esempio , 1970-01-01 00:00:00 UTC 1686444353000 . Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente .endingOffsetsByTimestamp ha la precedenza rispetto a endingOffsets .Valore predefinito: Nessuno |
endingTimestamp Tipo: Tipo di query: String solo batchValore stringa del timestamp espresso in millisecondi da 1970-01-01 00:00:00 UTC , ad esempio "1686444353000" . Se Kafka non restituisce l'offset corrispondente, l'offset verrà impostato su latest. Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Nota: endingTimestamp ha la precedenza su endingOffsetsByTimestamp eendingOffsets .Valore predefinito: Nessuno |
includeHeaders Tipo: Tipo di query: Boolean streaming e batchIndica se includere le intestazioni Kafka nella riga. Valore predefinito: false |
kafka.<consumer_option> Tipo: Tipo di query: String streaming e batchQualsiasi opzione specifica del consumer Kafka può essere passata con il kafka. prefisso . Queste opzioni devono essere racchiuse da backtick quando specificato. In caso contrario, verrà visualizzato un errore del parser. Le opzioni sono disponibili nella documentazione di Kafka.Nota: non è consigliabile impostare le opzioni seguenti con questa funzione: key.deserializer , value.deserializer , bootstrap.servers group.id Valore predefinito: Nessuno |
maxOffsetsPerTrigger Tipo: Tipo di query: Long solo streamingLimite di velocità per il numero massimo di offset o righe elaborate per intervallo di trigger. Il numero totale specificato di offset verrà suddiviso proporzionalmente tra TopicPartitions. Valore predefinito: Nessuno |
startingOffsets Tipo: Tipo di query: String streaming e batchPunto iniziale all'avvio di una query, "earliest" ovvero dagli offset meno recenti, "latest" ovvero dagli offset più recenti o da una stringa JSON che specifica un offset iniziale per ogni TopicPartition. In JSON, -2 come offset può essere usato per fare riferimento alla prima versione più -1 recente.Nota: per le query batch, la versione più recente (implicitamente o tramite -1 in JSON) non è consentita. Per le query di streaming, questo vale solo quando viene avviata una nuova query. Le query di streaming riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno al più presto. Valore predefinito: "latest" per lo streaming, "earliest" per batch |
startingOffsetsByTimestamp Tipo: Tipo di query: String streaming e batchStringa JSON che specifica un timestamp iniziale per ogni TopicPartition. I timestamp devono essere specificati come valore lungo del timestamp in millisecondi, ad 1970-01-01 00:00:00 UTC esempio 1686444353000 . Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Se Kafka non restituisce l'offset corrispondente, il comportamento seguirà il valore dell'opzione startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp ha la precedenza rispetto a startingOffsets .Nota: per le query di streaming, questa operazione si applica solo all'avvio di una nuova query. Le query di streaming riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno al più presto. Valore predefinito: Nessuno |
startingOffsetsByTimestampStrategy Tipo: Tipo di query: String streaming e batchQuesta strategia viene usata quando l'offset iniziale specificato per timestamp (globale o per partizione) non corrisponde all'offset restituito da Kafka. Le strategie disponibili sono: - "error" : non è possibile eseguire la query- "latest" : assegna l'offset più recente per queste partizioni in modo che Spark possa leggere i record più recenti da queste partizioni in micro batch successivi.Valore predefinito: "error" |
startingTimestamp Tipo: Tipo di query: String streaming e batchValore stringa del timestamp espresso in millisecondi da 1970-01-01 00:00:00 UTC , ad esempio "1686444353000" . Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Se Kafka non restituisce l'offset corrispondente, il comportamento seguirà il valore dell'opzione startingOffsetsByTimestampStrategy .startingTimestamp ha la precedenza su startingOffsetsByTimestamp e startingOffsets .Nota: per le query di streaming, questa operazione si applica solo all'avvio di una nuova query. Le query di streaming riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno prima. Valore predefinito: Nessuno |
Nota
L'offset restituito per ogni partizione è l'offset meno recente il cui timestamp è maggiore o uguale al timestamp specificato nella partizione corrispondente. Il comportamento varia in base alle opzioni se Kafka non restituisce l'offset corrispondente. Controllare la descrizione di ogni opzione.
Spark passa semplicemente le informazioni sul timestamp a KafkaConsumer.offsetsForTimes
e non interpreta o ragiona sul valore. Per altri dettagli su KafkaConsumer.offsetsForTimes
, vedere la documentazione. Inoltre, il significato del timestamp qui può variare in base alla configurazione Kafka (log.message.timestamp.type
). Per informazioni dettagliate, vedere la documentazione di Apache Kafka.