Integrieren der Apache Kafka Connect-Unterstützung in Azure Event Hubs
Apache Kafka Connect ist ein Framework zum Verbinden und Importieren/Exportieren von Daten aus einem beliebigen externen System bzw. in ein beliebiges externes System wie MySQL, HDFS und ein Dateisystem über einen Kafka-Cluster. In diesem Artikel wird die Nutzung eines Kafka Connect-Frameworks mit Event Hubs Schritt für Schritt beschrieben.
Dieser Artikel führt Sie schrittweise durch die Vorgehensweise zum Integrieren von Kafka Connect in einen Event Hub und zum Bereitstellen von einfachen FileStreamSource
- und FileStreamSink
-Connectors. Die Connectors sind zwar nicht für die Verwendung in der Produktion bestimmt, aber sie stellen ein End-to-End-Szenario für Kafka Connect dar, bei dem Azure Event Hubs als Kafka-Broker fungiert.
Hinweis
Dieses Beispiel ist auf GitHub verfügbar.
Voraussetzungen
Stellen Sie für das Durcharbeiten dieses Tutorials zur Vorgehensweise sicher, dass die folgenden Voraussetzungen erfüllt sind:
- Azure-Abonnement. Falls Sie nicht über ein Abonnement verfügen, können Sie ein kostenloses Konto erstellen.
- Git-Client
- Linux/macOS
- Die neueste Kafka-Version von kafka.apache.org.
- Lesen Sie den Einführungsartikel Event Hubs für Apache Kafka.
Erstellen eines Event Hubs-Namespace
Ein Event Hubs-Namespace ist erforderlich, um Nachrichten an einen Event Hubs-Dienst zu senden und von diesem zu empfangen. Anweisungen zum Erstellen eines Namespace und eines Event Hub finden Sie unter Erstellen eines Event Hubs. Rufen Sie die Event Hubs-Verbindungszeichenfolge und den vollqualifizierten Domänennamen (Fully Qualified Domain Name, FQDN) zur späteren Verwendung ab. Anweisungen hierzu finden Sie unter Get an Event Hubs connection string (Abrufen einer Event Hubs-Verbindungszeichenfolge).
Klonen des Beispielprojekts
Klonen Sie das Azure Event Hubs-Repository, und navigieren Sie zum Unterordner „tutorials/connect“:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Konfigurieren von Kafka Connect für Event Hubs
Es ist nur eine minimale Neukonfiguration erforderlich, wenn Sie den Kafka Connect-Durchsatz von Kafka an Event Hubs umleiten. Im folgenden Beispiel connect-distributed.properties
ist dargestellt, wie Sie Connect konfigurieren, um die Authentifizierung und Kommunikation mit dem Kafka-Endpunkt unter Event Hubs einzurichten:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
Wichtig
Ersetzen Sie {YOUR.EVENTHUBS.CONNECTION.STRING}
durch die Verbindungszeichenfolge für Ihren Event Hubs-Namespace. Anweisungen zum Abrufen der Verbindungszeichenfolge finden Sie unter Abrufen einer Event Hubs-Verbindungszeichenfolge. Hier sehen Sie eine Beispielkonfiguration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Ausführen von Kafka Connect
In diesem Schritt wird ein Kafka Connect-Worker lokal im verteilten Modus gestartet, indem Event Hubs zum Warten des Clusterzustands verwendet wird.
- Speichern Sie die Datei
connect-distributed.properties
lokal. Achten Sie darauf, alle Werte in geschweiften Klammern zu ersetzen. - Navigieren Sie zum Speicherort des Kafka-Release auf Ihrem Computer.
- Führen Sie
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
aus. Die Connect-Worker-REST-API ist für die Interaktion bereit, wenn'INFO Finished starting connectors and tasks'
angezeigt wird.
Hinweis
Kafka Connect verwendet die Kafka-AdminClient-API, um automatisch Themen mit empfohlenen Konfigurationen, einschließlich Komprimierung, zu erstellen. Eine schnelle Überprüfung des Namespace im Azure-Portal ergibt, dass die internen Themen des Connect-Workers automatisch erstellt wurden.
Interne Kafka Connect-Themen müssen Komprimierung verwenden. Das Event Hubs-Team ist nicht für die Korrektur falscher Konfigurationen zuständig, sollten interne Connect-Themen nicht ordnungsgemäß konfiguriert sein.
Erstellen von Connectors
In diesem Abschnitt wird das Hochfahren von FileStreamSource
- und FileStreamSink
-Connectors erläutert.
Erstellen Sie ein Verzeichnis für Ein- und Ausgabedatendateien.
mkdir ~/connect-quickstart
Erstellen Sie zwei Dateien: eine Datei mit Startdaten, aus der der
FileStreamSource
-Connector liest, und eine weitere Datei, in die derFileStreamSink
-Connector schreibt.seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Erstellen Sie einen
FileStreamSource
-Connector. Achten Sie darauf, dass Sie den Inhalt der geschweiften Klammern durch den Pfad Ihres Basisverzeichnisses ersetzen.curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
In Ihrer Event Hubs-Instanz sollte nach dem Ausführen des Befehls der Event Hub
connect-quickstart
angezeigt werden.Überprüfen Sie den Status des Quellconnectors.
curl -s http://localhost:8083/connectors/file-source/status
Optional können Sie Service Bus Explorer nutzen, um zu überprüfen, ob Ereignisse im Thema
connect-quickstart
eingegangen sind.Erstellen Sie einen FileStreamSink-Connector. Achten Sie auch hier wieder darauf, dass Sie den Inhalt der geschweiften Klammern durch den Pfad Ihres Basisverzeichnisses ersetzen.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Überprüfen Sie den Status des Senkenconnectors.
curl -s http://localhost:8083/connectors/file-sink/status
Stellen Sie sicher, dass die Daten zwischen den Dateien repliziert wurden und in beiden Dateien identisch sind.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Cleanup
Mit Kafka Connect werden Event Hub-Themen zum Speichern von Konfigurationen, Offsets und des Status erstellt, die auch dann beibehalten werden, nachdem der Connect-Cluster heruntergefahren wurde. Sofern diese Persistenz nicht gewünscht ist, empfehlen wir, diese Themen zu löschen. Sie können auch die Event Hubs-Instanzen vom Typ connect-quickstart
löschen, die in dieser exemplarischen Vorgehensweise erstellt wurden.
Zugehöriger Inhalt
Weitere Informationen zu Event Hubs für Kafka finden Sie in folgenden Artikeln: