Kafka エコシステム用の Event Hubs での Apache Kafka の使用

このチュートリアルでは、プロトコル クライアントを変更したり、独自のクラスターを実行したりせずに、Event Hubs による Apache Kafka のサポートを使用して Akka Streams を接続する方法を示します。

このチュートリアルでは、以下の内容を学習します。

  • Event Hubs 名前空間を作成します
  • サンプル プロジェクトを複製する
  • Akka Streams プロデューサーを実行する
  • Akka Streams コンシューマーを実行する

Note

このサンプルは GitHub で入手できます。

前提条件

このチュートリアルを完了するには、次の前提条件を満たしている必要があります。

  • Apache Kafka 用の Event Hubs に関する記事を読む。
  • Azure サブスクリプション。 お持ちでない場合は、開始する前に無料アカウントを作成してください。
  • Java Development Kit (JDK) 1.8 以降
    • Ubuntu で apt-get install default-jdk を実行して JDK をインストールします。
    • 必ず、JDK のインストール先フォルダーを指すように JAVA_HOME 環境変数を設定してください。
  • Maven バイナリ アーカイブのダウンロードインストール
    • Ubuntu で apt-get install maven を実行して Maven をインストールします。
  • Git
    • Ubuntu で sudo apt-get install git を実行して Git をインストールします。

Event Hubs 名前空間を作成します

Event Hubs サービスとの間で送受信を行うには、Event Hubs 名前空間が必要です。 詳細については、イベント ハブの作成に関するページを参照してください。 後で使うので、イベント ハブの接続文字列をコピーしておきます。

サンプル プロジェクトを複製する

Event Hubs の接続文字列を入手したので、Kafka 用 Azure Event Hubs リポジトリをクローンし、akka サブフォルダーに移動します。

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/akka/java

Akka Streams プロデューサーを実行する

提供された Akka Streams プロデューサーの例を使用して、Event Hubs サービスにメッセージを送信します。

Event Hubs Kafka エンドポイントを指定する

プロデューサー application.conf

producer/src/main/resources/application.confbootstrap.servers 値と sasl.jaas.config 値を更新し、正しい認証を使用してプロデューサーを Event Hubs Kafka エンドポイントに転送します。

akka.kafka.producer {
    #Akka Kafka producer properties can be defined here


    # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
    # can be defined in this configuration section.
    kafka-clients {
        bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
        sasl.mechanism=PLAIN
        security.protocol=SASL_SSL
        sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
    }
}

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} を Event Hubs 名前空間への接続文字列に置き換えます。 接続文字列を取得する手順については、「Event Hubs の接続文字列の取得」を参照してください。 構成の例には、sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX"; などがあります。

コマンド ラインからプロデューサーを実行する

コマンドラインからプロデューサーを実行するには、JAR を生成し、Maven 内から実行します (または、Maven を使用して JAR を生成し、必要な Kafka JAR をクラスパスに追加することによって、Java 内で実行します)。

mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestProducer"

プロデューサーは、トピック test でイベント ハブへのイベントの送信を開始し、stdout にイベントを出力します。

Akka Streams コンシューマーを実行する

提供されたコンシューマーの例を使用して、イベント ハブからメッセージを受信します。

Event Hubs Kafka エンドポイントを指定する

コンシューマー application.conf

consumer/src/main/resources/application.confbootstrap.servers 値と sasl.jaas.config 値を更新し、正しい認証を使用してコンシューマーを Event Hubs Kafka エンドポイントに転送します。

akka.kafka.consumer {
    #Akka Kafka consumer properties defined here
    wakeup-timeout=60s

    # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
    # defined in this configuration section.
    kafka-clients {
       request.timeout.ms=60000
       group.id=akka-example-consumer

       bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
       sasl.mechanism=PLAIN
       security.protocol=SASL_SSL
       sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
    }
}

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} を Event Hubs 名前空間への接続文字列に置き換えます。 接続文字列を取得する手順については、「Event Hubs の接続文字列の取得」を参照してください。 構成の例には、sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX"; などがあります。

コマンド ラインからコンシューマーを実行する

コマンドラインからコンシューマーを実行するには、JAR を生成し、Maven 内から実行します (または、Maven を使用して JAR を生成し、必要な Kafka JAR をクラスパスに追加することによって、Java 内で実行します)。

mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestConsumer"

イベント ハブにイベントがある場合 (たとえば、プロデューサーも実行されている場合)、コンシューマーはトピック test からのイベントの受信を開始します。

Akka Streams の詳細については、Akka Streams Kafka ガイドを参照してください。

次のステップ

Kafka 用 Event Hubs の詳細については、次の記事を参照してください。