Uso di Apache Kafka® su HDInsight con Apache Flink® su HDInsight su AKS
Importante
Azure HDInsight su AKS è stato ritirato il 31 gennaio 2025. Scopri di più con questo annuncio.
È necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare la chiusura brusca dei carichi di lavoro.
Importante
Questa funzionalità è attualmente in anteprima. Le condizioni supplementari per l'utilizzo per le anteprime di Microsoft Azure includono termini legali più validi applicabili alle funzionalità di Azure in versione beta, in anteprima o altrimenti non ancora rilasciate nella disponibilità generale. Per informazioni su questa anteprima specifica, vedere informazioni sull'anteprima di Azure HDInsight su AKS. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguire microsoft per altri aggiornamenti su community di Azure HDInsight.
Un caso d'uso noto per Apache Flink è l'analisi di flusso. Scelta comune da parte di molti utenti per l'uso dei flussi di dati, inseriti con Apache Kafka. Le installazioni tipiche di Flink e Kafka iniziano con l'invio dei flussi di eventi a Kafka, che possono essere consumati dai processi di Flink.
Questo esempio utilizza HDInsight su cluster AKS che eseguono Flink 1.17.0 per elaborare i dati di streaming consumando e producendo topic Kafka.
Nota
FlinkKafkaConsumer è deprecato e verrà rimosso con Flink 1.17. Usare invece KafkaSource. FlinkKafkaProducer è deprecato e verrà rimosso con Flink 1.15. Usare invece KafkaSink.
Prerequisiti
Sia Kafka che Flink devono trovarsi nella stessa rete virtuale o ci dovrebbe essere un peering VNet tra i due cluster.
Creare un cluster Kafka nella stessa rete virtuale. È possibile scegliere Kafka 3.2 o 2.4 in HDInsight in base all'utilizzo corrente.
Aggiungere i dettagli della rete virtuale nella sezione rete virtuale.
Creare un HDInsight nel pool del cluster AKS sulla stessa VNet.
Creare un cluster Flink nel pool di cluster creato.
Connettore Apache Kafka
Flink offre un connettore Apache Kafka per la lettura e la scrittura di dati in topic Kafka con garanzie di esattamente una volta.
dipendenza Maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
Compilazione del sink Kafka
Il sink Kafka fornisce una classe builder per costruire un'istanza di KafkaSink. Utilizziamo lo stesso per costruire il Sink e lo usiamo insieme a un cluster Flink in esecuzione su HDInsight su AKS.
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
Scrittura di un programma Java Event.java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
Pacchettizzare il jar e inviare il job a Flink
In Webssh caricare il file JAR e inviare il file JAR
Sull'interfaccia utente del dashboard di Flink
Crea il topic - fai clic su Kafka
Consuma l'argomento - eventi su Kafka
Riferimento
- connettore Apache Kafka
- Apache, Apache Kafka, Kafka, Apache Flink, Flink e i nomi dei progetti open source associati sono marchi di Apache Software Foundation (ASF).