Azure Event Hubs'a Apache Kafka Connect desteğiyle tümleştirme
Apache Kafka Connect , Bir Kafka kümesi aracılığıyla MySQL, HDFS ve dosya sistemi gibi herhangi bir dış sisteme veri bağlamayı ve içeri/dışarı aktarmayı sağlayan bir çerçevedir. Bu makalede, Event Hubs ile Kafka Connect çerçevesi kullanma konusunda size yol gösterir.
Bu makalede Kafka Connect'i bir olay hub'ı ile tümleştirme ve temel FileStreamSource
ve FileStreamSink
bağlayıcıları dağıtma adımları açıklenmektedir. Bu bağlayıcılar üretim kullanımı için tasarlanmamış olsa da, Azure Event Hubs'ın Kafka aracısı olarak davrandığı uçtan uca bir Kafka Connect senaryosu gösterir.
Not
Bu örnek GitHub'da sağlanır.
Önkoşullar
Bu yol gösterici adımları tamamlamak için aşağıdaki önkoşulların karşılandığından emin olun:
- Azure aboneliği. Aboneliğiniz yoksa ücretsiz bir hesap oluşturun.
- Git
- Linux/MacOS
- en son Kafka sürümü kafka.apache.org
- Apache Kafka için Event Hubs giriş makalesini okuyun
Event Hubs ad alanı oluşturma
Herhangi bir Event Hubs hizmetinden göndermek ve almak için Event Hubs ad alanı gereklidir. Ad alanı ve olay hub'ı oluşturma yönergeleri için bkz. Olay hub'ı oluşturma. Daha sonra kullanmak üzere Event Hubs bağlantı dizesini ve tam etki alanı adını (FQDN) alın. Yönergeler için bkz. Event Hubs bağlantı dizesi alma.
Örnek projeyi kopyalama
Azure Event Hubs deposunu kopyalayın ve tutorials/connect alt klasörüne gidin:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Event Hubs için Kafka Connect'i yapılandırma
Kafka Connect aktarım hızını Kafka'dan Event Hubs'a yeniden yönlendirmek için çok az yeniden yapılandırma gerekir. Aşağıdaki connect-distributed.properties
örneğinde, Event Hubs'da Kafka uç noktasıyla kimlik doğrulaması yapmak ve iletişim kurmak için Connect'in nasıl yapılandırılacağı gösterilir:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
Önemli
değerini Event Hubs ad alanınızın bağlantı dizesi ile değiştirin{YOUR.EVENTHUBS.CONNECTION.STRING}
. bağlantı dizesi alma yönergeleri için bkz. Event Hubs bağlantı dizesi alma. Aşağıda örnek bir yapılandırma verilmişti: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Kafka Connect'i çalıştırma
Bu adımda, bir Kafka Connect çalışanı dağıtılmış modda yerel olarak başlatılır ve küme durumunu korumak için Event Hubs kullanılır.
connect-distributed.properties
Dosyayı yerel olarak kaydedin. Küme ayracı içindeki tüm değerleri değiştirdiğinizden emin olun.- Makinenizde Kafka sürümünün konumuna gidin.
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
'i çalıştırın.'INFO Finished starting connectors and tasks'
iletisini gördüğünüzde Connect çalışanı REST API etkileşime hazır demektir.
Not
Kafka Connect, sıkıştırma da dahil olmak üzere önerilen yapılandırmalara sahip konuları otomatik olarak oluşturmak için Kafka AdminClient API'sini kullanır. Azure portalında ad alanına hızla göz attığınızda, Connect çalışanı iç konusunun otomatik olarak oluşturulduğu ortaya çıkar.
Kafka Connect iç konu başlıkları sıkıştırma kullanmalıdır. İç Connect konuları yanlış yapılandırılırsa, Event Hubs ekibi hatalı yapılandırmaları düzeltmekle sorumlu değildir.
Bağlayıcıları oluşturma
Bu bölümde, döndürme FileStreamSource
ve FileStreamSink
bağlayıcılar için size yol gösterir.
Giriş ve çıkış veri dosyaları için bir dizin oluşturun.
mkdir ~/connect-quickstart
İki dosya oluşturun: bağlayıcının
FileStreamSource
okuduğu tohum verilerine sahip bir dosya ve bağlayıcımızınFileStreamSink
yazdığı başka bir dosya.seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
FileStreamSource
Bağlayıcı oluşturun. Küme ayraçlarının içindeki değeri giriş dizin yolunuzla değiştirmeyi unutmayın.curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
Komutunu çalıştırdıktan sonra Event Hubs örneğinizde olay hub'ını
connect-quickstart
görmeniz gerekir.Kaynak bağlayıcının durumunu denetleyin.
curl -s http://localhost:8083/connectors/file-source/status
İsteğe bağlı olarak, olayların konuya geldiğini doğrulamak için Service Bus Gezgini'ni
connect-quickstart
kullanabilirsiniz.FileStreamSink Bağlayıcısını oluşturun. Küme ayraçlarının içindeki değeri giriş dizin yolunuzla değiştirdiğinizden emin olun.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Havuz bağlayıcısının durumunu denetleyin.
curl -s http://localhost:8083/connectors/file-sink/status
Verilerin dosyalar arasında çoğaltıldığını ve her iki dosyada da aynı veriler bulunduğunu doğrulayın.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Temizleme
Kafka Connect, Connect kümesi indirildikten sonra bile kalıcı olan yapılandırmaları, uzaklıkları ve durumu depolamak için Event Hubs konuları oluşturur. Bu kalıcılık istenmediği sürece, bu konuları silmenizi öneririz. Bu izlenecek yol sırasında oluşturulan Event Hubs'ı da silmek connect-quickstart
isteyebilirsiniz.
İlgili içerik
Kafka için Event Hubs hakkında daha fazla bilgi edinmek için aşağıdaki makalelere bakın: