Použití Apache Kafka ve službě HDInsight se službou Azure IoT Hub
Zjistěte, jak pomocí konektoru Apache Kafka Připojení Azure IoT Hubu přesouvat data mezi Apache Kafka ve službě HDInsight a Azure IoT Hubem. V tomto dokumentu se dozvíte, jak spustit konektor IoT Hubu z hraničního uzlu v clusteru.
Rozhraní Kafka Připojení API umožňuje implementovat konektory, které průběžně načítá data do Systému Kafka, nebo odesílat data ze systému Kafka do jiného systému. Apache Kafka Připojení Azure IoT Hub je konektor, který načítá data z Azure IoT Hubu do Kafka. Může také odesílat data ze systému Kafka do IoT Hubu.
Při načítání ze služby IoT Hub použijete zdrojový konektor. Při odesílání do IoT Hubu použijete konektor jímky . Konektor IoT Hubu poskytuje konektory zdroje i jímky.
Následující diagram znázorňuje tok dat mezi Azure IoT Hubem a Kafka ve službě HDInsight při použití konektoru.
Další informace o rozhraní PŘIPOJENÍ API najdete v tématu https://kafka.apache.org/documentation/#connect.
Požadavky
Cluster Apache Kafka ve službě HDInsight. Další informace najdete v dokumentu Rychlý start k systému Kafka ve službě HDInsight.
Hraniční uzel v clusteru Kafka. Další informace najdete v tématu Použití hraničních uzlů s dokumentem HDInsight .
Klient SSH. Další informace najdete v tématu Připojení ke službě HDInsight (Apache Hadoop) pomocí SSH.
Azure IoT Hub a zařízení. V tomto článku zvažte použití online simulátoru Raspberry Pi Připojení do služby Azure IoT Hub.
Nástroj pro sestavení Scala
Sestavení konektoru
Stáhněte zdroj konektoru z https://github.com/Azure/toketi-kafka-connect-iothub/ místního prostředí.
Z příkazového řádku přejděte do
toketi-kafka-connect-iothub-master
adresáře. Pak pomocí následujícího příkazu sestavte a zabalte projekt:sbt assembly
Dokončení sestavení trvá několik minut. Příkaz vytvoří soubor pojmenovaný
kafka-connect-iothub-assembly_2.11-0.7.0.jar
vtoketi-kafka-connect-iothub-master\target\scala-2.11
adresáři projektu.
Instalace konektoru
Nahrajte soubor .jar do hraničního uzlu vašeho kafka v clusteru HDInsight. Upravte následující příkaz nahrazením
CLUSTERNAME
skutečného názvu clusteru. Výchozí hodnoty uživatelského účtu SSH a názvu hraničního uzlu se použijí a upraví podle potřeby.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
Po dokončení kopírování souboru se připojte k hraničnímu uzlu pomocí SSH:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Pokud chcete konektor nainstalovat do adresáře Kafka
libs
, použijte následující příkaz:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Ponechte připojení SSH aktivní pro zbývající kroky.
Konfigurace Apache Kafka
Z připojení SSH k hraničnímu uzlu pomocí následujících kroků nakonfigurujte Kafka tak, aby spouštět konektor v samostatném režimu:
Nastavte proměnnou hesla. Nahraďte heslo heslem pro přihlášení ke clusteru a pak zadejte příkaz:
export password='PASSWORD'
Nainstalujte nástroj jq. Jq usnadňuje zpracování dokumentů JSON vrácených z dotazů Ambari. Zadejte tento příkaz:
sudo apt -y install jq
Získejte adresu zprostředkovatelů Kafka. V clusteru může být mnoho zprostředkovatelů, ale stačí odkazovat jenom na jednu nebo dvě. Pokud chcete získat adresu dvou hostitelů zprostředkovatele, použijte následující příkaz:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
Zkopírujte hodnoty pro pozdější použití. Vrácená hodnota je obdobná následujícímu textu:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Získejte adresu uzlů Apache Zookeeper. V clusteru je několik uzlů Zookeeper, ale stačí odkazovat jenom na jeden nebo dva. Pomocí následujícího příkazu uložte adresy do proměnné
KAFKAZKHOSTS
:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
Při spuštění konektoru v samostatném režimu
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
se soubor používá ke komunikaci s zprostředkovateli Kafka. Pokud chcete soubor upravitconnect-standalone.properties
, použijte následující příkaz:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
Proveďte následující úpravy:
Aktuální hodnota Nová hodnota Komentář bootstrap.servers=localhost:9092
localhost:9092
Nahraďte hodnotu hostiteli zprostředkovatele z předchozího kroku.Nakonfiguruje samostatnou konfiguraci hraničního uzlu pro vyhledání zprostředkovatelů Kafka. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Tato změna umožňuje testovat pomocí producenta konzoly, který je součástí systému Kafka. Možná budete potřebovat různé převaděče pro ostatní producenty a spotřebitele. Informace o použití jiných hodnot převaděče naleznete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Stejné jako dané. – consumer.max.poll.records=10
Přidat na konec souboru Touto změnou zabráníte vypršení časových limitů konektoru jímky tím, že ho omezíte na 10 záznamů najednou. Další informace najdete na webu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. Pokud chcete soubor uložit, použijte ctrl + X, Y a pak Enter.
Pokud chcete vytvořit témata používaná konektorem, použijte následující příkazy:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
K ověření
iotin
existence těchto tématiotout
použijte následující příkaz:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
Toto
iotin
téma se používá k příjmu zpráv ze služby IoT Hub. Totoiotout
téma se používá k odesílání zpráv do IoT Hubu.
Získání informací o připojení ke službě IoT Hub
Pokud chcete načíst informace o službě IoT Hub používané konektorem, postupujte následovně:
Získejte koncový bod kompatibilní s centrem událostí a název koncového bodu kompatibilního s centrem událostí pro vaše centrum IoT. K získání těchto informací použijte jednu z následujících metod:
Na webu Azure Portal použijte následující kroky:
Přejděte do služby IoT Hub a vyberte Koncové body.
V předdefinovaných koncových bodech vyberte Události.
Z vlastností zkopírujte hodnotu následujících polí:
- Název kompatibilní s centrem událostí
- Koncový bod kompatibilní s centrem událostí
- Oddíly
Důležité
Hodnota koncového bodu z portálu může obsahovat další text, který není v tomto příkladu potřeba. Extrahujte text, který odpovídá tomuto vzoru
sb://<randomnamespace>.servicebus.windows.net/
.
V Azure CLI použijte následující příkaz:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
Nahraďte
myhubname
názvem vašeho centra IoT. Odpověď je podobná následujícímu textu:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Získejte zásady a klíč sdíleného přístupu. V tomto příkladu použijte klíč služby . K získání těchto informací použijte jednu z následujících metod:
Na webu Azure Portal použijte následující kroky:
- Vyberte zásady sdíleného přístupu a pak vyberte službu.
- Zkopírujte hodnotu primárního klíče .
- Zkopírujte Připojení ion řetězec – hodnota primárního klíče.
V Azure CLI použijte následující příkaz:
Pokud chcete získat hodnotu primárního klíče, použijte následující příkaz:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
Nahraďte
myhubname
názvem vašeho centra IoT. Odpověď je primárním klíčem k zásadámservice
pro toto centrum.K získání připojovací řetězec pro zásadu
service
použijte následující příkaz:az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
Nahraďte
myhubname
názvem vašeho centra IoT. Odpověď je připojovací řetězec zásadyservice
.
Konfigurace zdrojového připojení
Pokud chcete nakonfigurovat zdroj pro práci se službou IoT Hub, proveďte následující akce z připojení SSH k hraničnímu uzlu:
Vytvořte kopii
connect-iot-source.properties
souboru v/usr/hdp/current/kafka-broker/config/
adresáři. Pokud chcete stáhnout soubor z projektu toketi-kafka-connect-iothub, použijte následující příkaz:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
Pokud chcete soubor upravit
connect-iot-source.properties
a přidat informace o službě IoT Hub, použijte následující příkaz:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
V editoru vyhledejte a změňte následující položky:
Aktuální hodnota Upravit Kafka.Topic=PLACEHOLDER
Nahraďte PLACEHOLDER
iotin
. Zprávy přijaté ze služby IoT Hub se umístí doiotin
tématu.IotHub.EventHubCompatibleName=PLACEHOLDER
Nahraďte PLACEHOLDER
názvem kompatibilním s centrem událostí.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
Nahraďte PLACEHOLDER
koncovým bodem kompatibilním s centrem událostí.IotHub.AccessKeyName=PLACEHOLDER
Nahraďte PLACEHOLDER
service
.IotHub.AccessKeyValue=PLACEHOLDER
Nahraďte PLACEHOLDER
primárním klíčemservice
zásady.IotHub.Partitions=PLACEHOLDER
Nahraďte PLACEHOLDER
počtem oddílů z předchozích kroků.IotHub.StartTime=PLACEHOLDER
Nahraďte PLACEHOLDER
datem UTC. Toto datum je, když konektor začne kontrolovat zprávy. Formát data jeyyyy-mm-ddThh:mm:ssZ
.BatchSize=100
Nahraďte 100
5
. Tato změna způsobí, že konektor bude číst zprávy do Systému Kafka, jakmile bude v IoT Hubu pět nových zpráv.Příklad konfigurace najdete v tématu Kafka Připojení Source Připojení or pro Azure IoT Hub.
Pokud chcete uložit změny, použijte ctrl + X, Y a pak Enter.
Další informace o konfiguraci zdroje konektoru najdete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.
Konfigurace připojení jímky
Pokud chcete nakonfigurovat připojení jímky tak, aby fungovalo se službou IoT Hub, proveďte následující akce z připojení SSH k hraničnímu uzlu:
Vytvořte kopii
connect-iothub-sink.properties
souboru v/usr/hdp/current/kafka-broker/config/
adresáři. Pokud chcete stáhnout soubor z projektu toketi-kafka-connect-iothub, použijte následující příkaz:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
Pokud chcete soubor upravit
connect-iothub-sink.properties
a přidat informace o službě IoT Hub, použijte následující příkaz:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
V editoru vyhledejte a změňte následující položky:
Aktuální hodnota Upravit topics=PLACEHOLDER
Nahraďte PLACEHOLDER
iotout
. Zprávy zapsané doiotout
tématu se přeposílají do centra IoT.IotHub.ConnectionString=PLACEHOLDER
Nahraďte PLACEHOLDER
připojovací řetězec zásadyservice
.Příklad konfigurace najdete v tématu Kafka Připojení Sink Připojení or pro Azure IoT Hub.
Pokud chcete uložit změny, použijte ctrl + X, Y a pak Enter.
Další informace o konfiguraci jímky konektoru najdete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Spuštění zdrojového konektoru
Ke spuštění zdrojového konektoru použijte následující příkaz z připojení SSH k hraničnímu uzlu:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Jakmile se konektor spustí, odešlete zprávy do ioT Hubu z vašich zařízení. Když konektor čte zprávy ze služby IoT Hub a ukládá je do tématu Kafka, protokoluje informace do konzoly:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Poznámka:
Při spuštění konektoru se může zobrazit několik upozornění. Tato upozornění nezpůsobují problémy s příjmem zpráv ze služby IoT Hub.
Zastavte spojnici po několika minutách pomocí kombinace kláves Ctrl + C dvakrát. Zastavení konektoru trvá několik minut.
Spuštění konektoru jímky
Pomocí následujícího příkazu z připojení SSH ke hraničnímu uzlu spusťte konektor jímky v samostatném režimu:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Při spuštění konektoru se zobrazí podobné informace jako v následujícím textu:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Poznámka:
Při spuštění konektoru si můžete všimnout několika upozornění. Ta můžete bezpečně ignorovat.
Odesílání zpráv
Pokud chcete odesílat zprávy přes konektor, postupujte následovně:
Otevřete druhou relaci SSH pro cluster Kafka:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Získejte adresu zprostředkovatelů Kafka pro novou relaci SSH. Nahraďte heslo heslem pro přihlášení ke clusteru a pak zadejte příkaz:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
K odesílání zpráv do
iotout
tématu použijte následující příkaz:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
Tento příkaz vás nevrátí do normálního příkazového řádku Bash. Místo toho odesílá do tématu vstup
iotout
pomocí klávesnice.Pokud chcete odeslat zprávu do zařízení, vložte dokument JSON do relace SSH pro dané
kafka-console-producer
zařízení.Důležité
Je nutné nastavit hodnotu
"deviceId"
položky na ID vašeho zařízení. V následujícím příkladu má zařízení názevmyDeviceId
:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
Schéma pro tento dokument JSON je podrobněji popsáno na adrese https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Pokud používáte simulované zařízení Raspberry Pi a je spuštěné, zařízení zaznamená následující zprávu:
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
Další informace o použití konektoru jímky naleznete v tématu https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Další kroky
V tomto dokumentu jste zjistili, jak pomocí rozhraní Apache Kafka Připojení API spustit ioT Kafka Připojení or ve službě HDInsight. Pomocí následujících odkazů můžete zjistit další způsoby práce se systémem Kafka: