AKS 上の HDInsight 上の Apache Flink® で HDInsight 上の Apache Kafka® を使用する方法について説明します
重要
現在、この機能はプレビュー段階にあります。 ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用されるその他の法律条項については、「Microsoft Azure プレビューの追加の使用条件」に記載されています。 この特定のプレビューについては、Azure HDInsight on AKS のプレビュー情報に関する記事を参照してください。 質問や機能の提案については、詳細を記載した要求を AskHDInsight で送信してください。また、その他の更新情報については、Azure HDInsight コミュニティをフォローしてください。
Apache Flink のよく知られているユース ケースは、ストリーム分析です。 Apache Kafka を使用して取り込まれるデータ ストリームを使用する多くのユーザーが一般的に選択しています。 Flink と Kafka の一般的なインストールは、Flink ジョブで使用できるイベント ストリームが Kafka にプッシュされることから始まります。
この例では、Flink 1.17.0 を実行する AKS クラスターの HDInsight を使用して、Kafka トピックを使用して生成するストリーミング データを処理します。
Note
FlinkKafkaConsumer は非推奨となり、Flink 1.17 で削除されます。代わりに KafkaSource を使用してください。 FlinkKafkaProducer は非推奨となり、Flink 1.15 で削除されます。代わりに KafkaSink を使用してください。
前提条件
Kafka と Flink の両方が同じ VNet 内にある必要があります。または、2 つのクラスター間に vnet ピアリングが存在する必要があります。
同じ VNet に Kafka クラスターを作成します。 現在の使用状況に基づいて、HDInsight で Kafka 3.2 または 2.4 を選択できます。
仮想ネットワーク セクションに VNet の詳細を追加します。
同じ VNet を使用して HDInsight on AKS クラスター プールを作成します。
作成されたクラスター プールに対する Flink クラスターを作成します。
Apache Kafka コネクタ
Flink には、Kafka トピックとの間でデータを読み書きするための Apache Kafka コネクタが 1 回限りの保証で用意されています。
Maven の依存関係
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
Kafka シンクの構築
Kafka シンクには、KafkaSink のインスタンスを構築するためのビルダー クラスが用意されています。 これを使用してシンクを構築し、AKS 上の HDInsight で実行されている Flink クラスターと共に使用します
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
Java プログラム Event.java の作成
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
jar をパッケージ化してジョブを Flink に送信する
Webssh で jar をアップロードし、jar を送信します
Flink ダッシュボード UI の場合
Kafka で topic - clicks を作成する
Kafka で topic - events を使用する
リファレンス
- Apache Kafka コネクタ
- Apache、Apache Kafka、Kafka、Apache Flink、Flink、および関連するオープン ソース プロジェクト名は、 Apache Software Foundation (ASF) の 商標 です。