Apache Flink® DataStream API'siyle Azure Data Lake Storage 2. Nesil olay iletileri yazma
Not
31 Ocak 2025'te AKS'de Azure HDInsight'ı kullanımdan kaldırmaya devam edeceğiz. 31 Ocak 2025'den önce, iş yüklerinizin aniden sonlandırılmasını önlemek için iş yüklerinizi Microsoft Fabric'e veya eşdeğer bir Azure ürününe geçirmeniz gerekir. Aboneliğinizdeki kalan kümeler durdurulur ve konaktan kaldırılır.
Kullanımdan kaldırma tarihine kadar yalnızca temel destek sağlanacaktır.
Önemli
Bu özellik şu anda önizlemededir. Microsoft Azure Önizlemeleri için Ek Kullanım Koşulları, beta, önizleme aşamasında olan veya henüz genel kullanıma sunulmamış Azure özellikleri için geçerli olan daha fazla yasal hüküm içerir. Bu belirli önizleme hakkında bilgi için bkz . AKS üzerinde Azure HDInsight önizleme bilgileri. Sorular veya özellik önerileri için lütfen AskHDInsight'ta ayrıntıları içeren bir istek gönderin ve Azure HDInsight Topluluğu hakkında daha fazla güncelleştirme için bizi takip edin.
Apache Flink, hem uygulamaların sonuçları hem de hataya dayanıklılık ve kurtarma için verileri kullanmak ve kalıcı olarak depolamak için dosya sistemlerini kullanır. Bu makalede DataStream API ile Azure Data Lake Storage 2. Nesil olay iletileri yazmayı öğrenin.
Önkoşullar
- AKS üzerinde HDInsight üzerinde Apache Flink kümesi
- HDInsight üzerinde Apache Kafka kümesi
- HDInsight'ta Apache Kafka kullanma konusunda açıklandığı gibi ağ ayarlarının dikkate alındığından emin olmanız gerekir. AKS ve HDInsight kümelerindeki HDInsight'ın aynı Sanal Ağ olduğundan emin olun.
- ADLS 2. Nesil'e erişmek için MSI kullanma
- AKS Sanal Ağ üzerinde HDInsight'ta Azure VM'sinde geliştirme için IntelliJ
Apache Flink FileSystem bağlayıcısı
Bu dosya sistemi bağlayıcısı hem BATCH hem de STREAMING için aynı garantileri sağlar ve STREAMING yürütmesi için tam olarak bir kez semantik sağlamak üzere tasarlanmıştır. Daha fazla bilgi için bkz . Flink DataStream Filesystem.
Apache Kafka Bağlayıcısı
Flink, kafka konularından veri okumak ve kafka konularına veri yazmak için tam olarak bir kez garanti eden bir Apache Kafka bağlayıcısı sağlar. Daha fazla bilgi için bkz . Apache Kafka Bağlayıcısı.
Apache Flink için projeyi derleme
IntelliJ IDEA'da pom.xml
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
ADLS 2. Nesil Havuzu için Program
abfsGen2.java
Not
HDInsight kümesi bootStrapServers üzerinde Apache Kafka'nın yerine Kafka 3.2 için kendi aracılarınızı ekleyin
package contoso.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class KafkaSinkToGen2 {
public static void main(String[] args) throws Exception {
// 1. get stream execution env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration flinkConfig = new Configuration();
flinkConfig.setString("classloader.resolve-order", "parent-first");
env.getConfig().setGlobalJobParameters(flinkConfig);
// 2. read kafka message as stream input, update your broker ip's
String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("click_events")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.print();
// 3. sink to gen2, update container name and storage path
String outputPath = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
stream.sinkTo(sink);
// 4. run stream
env.execute("Kafka Sink To Gen2");
}
}
Jar dosyasını paketleyip Apache Flink'e gönderin.
Jar dosyasını ABFS'ye yükleyin.
Küme oluşturma işleminde iş jar bilgilerini
AppMode
geçirin.Not
classloader.resolve-order dosyasını 'parent-first' ve hadoop.classpath.enable olarak eklediğinizden emin olun
true
İş günlüklerini depolama hesabına göndermek için İş Günlüğü toplama'yı seçin.
İşin çalıştığını görebilirsiniz.
ADLS 2. Nesil'de akış verilerini doğrulama
Akışı ADLS 2. Nesil'de görüyoruz click_events
.
Devam eden bölüm dosyasını aşağıdaki üç koşuldan herhangi birinde toplayan bir sıralı ilke belirtebilirsiniz:
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(5))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
Başvuru
- Apache Kafka Bağlayıcısı
- Flink DataStream Dosya Sistemi
- Apache Flink Web Sitesi
- Apache, Apache Kafka, Kafka, Apache Flink, Flink ve ilişkili açık kaynak proje adları Apache Software Foundation'ın (ASF) ticari markalarıdır.