Apache Flink® DataStream API'siyle Azure Data Lake Storage 2. Nesil olay iletileri yazma

Not

31 Ocak 2025'te AKS'de Azure HDInsight'ı kullanımdan kaldırmaya devam edeceğiz. 31 Ocak 2025'den önce, iş yüklerinizin aniden sonlandırılmasını önlemek için iş yüklerinizi Microsoft Fabric'e veya eşdeğer bir Azure ürününe geçirmeniz gerekir. Aboneliğinizdeki kalan kümeler durdurulur ve konaktan kaldırılır.

Kullanımdan kaldırma tarihine kadar yalnızca temel destek sağlanacaktır.

Önemli

Bu özellik şu anda önizlemededir. Microsoft Azure Önizlemeleri için Ek Kullanım Koşulları, beta, önizleme aşamasında olan veya henüz genel kullanıma sunulmamış Azure özellikleri için geçerli olan daha fazla yasal hüküm içerir. Bu belirli önizleme hakkında bilgi için bkz . AKS üzerinde Azure HDInsight önizleme bilgileri. Sorular veya özellik önerileri için lütfen AskHDInsight'ta ayrıntıları içeren bir istek gönderin ve Azure HDInsight Topluluğu hakkında daha fazla güncelleştirme için bizi takip edin.

Apache Flink, hem uygulamaların sonuçları hem de hataya dayanıklılık ve kurtarma için verileri kullanmak ve kalıcı olarak depolamak için dosya sistemlerini kullanır. Bu makalede DataStream API ile Azure Data Lake Storage 2. Nesil olay iletileri yazmayı öğrenin.

Önkoşullar

Bu dosya sistemi bağlayıcısı hem BATCH hem de STREAMING için aynı garantileri sağlar ve STREAMING yürütmesi için tam olarak bir kez semantik sağlamak üzere tasarlanmıştır. Daha fazla bilgi için bkz . Flink DataStream Filesystem.

Apache Kafka Bağlayıcısı

Flink, kafka konularından veri okumak ve kafka konularına veri yazmak için tam olarak bir kez garanti eden bir Apache Kafka bağlayıcısı sağlar. Daha fazla bilgi için bkz . Apache Kafka Bağlayıcısı.

IntelliJ IDEA'da pom.xml

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

ADLS 2. Nesil Havuzu için Program

abfsGen2.java

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class KafkaSinkToGen2 {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         
        Configuration flinkConfig = new Configuration(); 

         flinkConfig.setString("classloader.resolve-order", "parent-first"); 

         env.getConfig().setGlobalJobParameters(flinkConfig);  

        // 2. read kafka message as stream input, update your broker ip's
        String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("click_events")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stream.print();

        // 3. sink to gen2, update container name and storage path
        String outputPath  = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())
                .build();

        stream.sinkTo(sink);

        // 4. run stream
        env.execute("Kafka Sink To Gen2");
    }
}

Jar dosyasını paketleyip Apache Flink'e gönderin.

  1. Jar dosyasını ABFS'ye yükleyin.

    Flink uygulama modu ekranını gösteren ekran görüntüsü.

  2. Küme oluşturma işleminde iş jar bilgilerini AppMode geçirin.

    Uygulama oluşturma modunu gösteren ekran görüntüsü.

    Not

    classloader.resolve-order dosyasını 'parent-first' ve hadoop.classpath.enable olarak eklediğinizden emin olun true

  3. İş günlüklerini depolama hesabına göndermek için İş Günlüğü toplama'yı seçin.

    İş günlüğünü etkinleştirmeyi gösteren ekran görüntüsü.

  4. İşin çalıştığını görebilirsiniz.

    Flink kullanıcı arabirimini gösteren ekran görüntüsü.

ADLS 2. Nesil'de akış verilerini doğrulama

Akışı ADLS 2. Nesil'de görüyoruz click_events .

ADLS 2. Nesil çıkışını gösteren ekran görüntüsü.Flink tıklama olayı çıkışını gösteren ekran görüntüsü.

Devam eden bölüm dosyasını aşağıdaki üç koşuldan herhangi birinde toplayan bir sıralı ilke belirtebilirsiniz:

.withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(5))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())

Başvuru