AKS üzerinde HDInsight üzerinde Apache Flink® üzerinde Apache Kafka® tablosu oluşturma
Not
31 Ocak 2025'te AKS'de Azure HDInsight'ı kullanımdan kaldırmaya devam edeceğiz. 31 Ocak 2025'den önce, iş yüklerinizin aniden sonlandırılmasını önlemek için iş yüklerinizi Microsoft Fabric'e veya eşdeğer bir Azure ürününe geçirmeniz gerekir. Aboneliğinizdeki kalan kümeler durdurulur ve konaktan kaldırılır.
Kullanımdan kaldırma tarihine kadar yalnızca temel destek sağlanacaktır.
Önemli
Bu özellik şu anda önizlemededir. Microsoft Azure Önizlemeleri için Ek Kullanım Koşulları, beta, önizleme aşamasında olan veya henüz genel kullanıma sunulmamış Azure özellikleri için geçerli olan daha fazla yasal hüküm içerir. Bu belirli önizleme hakkında bilgi için bkz . AKS üzerinde Azure HDInsight önizleme bilgileri. Sorular veya özellik önerileri için lütfen AskHDInsight'ta ayrıntıları içeren bir istek gönderin ve Azure HDInsight Topluluğu hakkında daha fazla güncelleştirme için bizi takip edin.
Bu örneği kullanarak Apache FlinkSQL'de Kafka tablosu oluşturmayı öğrenin.
Önkoşullar
Apache Flink üzerinde Kafka SQL bağlayıcısı
Kafka bağlayıcısı, Kafka konu başlıklarından veri okumanızı ve kafka konularına veri yazmanızı sağlar. Daha fazla bilgi için apache Kafka SQL Bağlayıcısı'nı inceleyin.
Flink SQL'de Kafka tablosu oluşturma
HDInsight Kafka'da konu ve veri hazırlama
weblog.py ile iletileri hazırlama
import random
import json
import time
from datetime import datetime
user_set = [
'John',
'XiaoMing',
'Mike',
'Tom',
'Machael',
'Zheng Hu',
'Zark',
'Tim',
'Andrew',
'Pick',
'Sean',
'Luke',
'Chunck'
]
web_set = [
'https://google.com',
'https://facebook.com?id=1',
'https://tmall.com',
'https://baidu.com',
'https://taobao.com',
'https://aliyun.com',
'https://apache.com',
'https://flink.apache.com',
'https://hbase.apache.com',
'https://github.com',
'https://gmail.com',
'https://stackoverflow.com',
'https://python.org'
]
def main():
while True:
if random.randrange(10) < 4:
url = random.choice(web_set[:3])
else:
url = random.choice(web_set)
log_entry = {
'userName': random.choice(user_set),
'visitURL': url,
'ts': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
}
print(json.dumps(log_entry))
time.sleep(0.05)
if __name__ == "__main__":
main()
Kafka'ya işlem hattı konusu
sshuser@hn0-contsk:~$ python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
Diğer komutlar:
-- create topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic click_events --bootstrap-server wn0-contsk:9092
-- delete topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic click_events --bootstrap-server wn0-contsk:9092
-- consume topic
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic click_events --from-beginning
{"userName": "Luke", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Tom", "visitURL": "https://stackoverflow.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Chunck", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Chunck", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Andrew", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Pick", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Mike", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Zheng Hu", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Luke", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:44"}
Apache Flink SQL istemcisi
Flink SQL istemcisi için Secure Shell'in nasıl kullanılacağı hakkında ayrıntılı yönergeler sağlanır.
Kafka SQL Bağlayıcısı ve Bağımlılıklarını SSH'ye indirin
Aşağıdaki adımda Kafka 3.2.0 bağımlılıklarını kullanıyoruz. KOMUTU HDInsight kümesindeki Kafka sürümünüz temelinde güncelleştirmeniz gerekir.
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
Apache Flink SQL İstemcisi'ne bağlanma
Şimdi Kafka SQL istemci jar'ları ile Flink SQL İstemcisi'ne bağlanalım.
msdata@pod-0 [ /opt/flink-webssh ]$ bin/sql-client.sh -j flink-connector-kafka-1.17.0.jar -j kafka-clients-3.2.0.jar
Apache Flink SQL'de Kafka tablosu oluşturma
Şimdi Flink SQL'de Kafka tablosunu oluşturalım ve Flink SQL'de Kafka tablosunu seçelim.
Aşağıdaki kod parçacığında Kafka bootstrap sunucu IP'lerinizi güncelleştirmeniz gerekir.
CREATE TABLE KafkaTable (
`userName` STRING,
`visitURL` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'click_events',
'properties.bootstrap.servers' = '<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092',
'properties.group.id' = 'my_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
select * from KafkaTable;
Kafka iletileri oluşturma
Şimdi HDInsight Kafka kullanarak aynı konuya Kafka iletileri üretelim.
python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events
Apache Flink SQL tablosu
Tabloyu Flink SQL'de izleyebilirsiniz.
Flink Web kullanıcı arabirimindeki akış işleri aşağıdadır.
Başvuru
- Apache Kafka SQL Bağlayıcısı
- Apache, Apache Kafka, Kafka, Apache Flink, Flink ve ilişkili açık kaynak proje adları Apache Software Foundation'ın (ASF) ticari markalarıdır.